package org.apache.kafka.clients.consumer.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.class */
public class DefaultBackgroundThread extends KafkaThread {
    private static final long MAX_POLL_TIMEOUT_MS = 5000;
    private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
    private final Time time;
    private final Logger log;
    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;

    /* renamed from: metadata, reason: collision with root package name */
    private final ConsumerMetadata f32metadata;
    private final ConsumerConfig config;
    private final ApplicationEventProcessor applicationEventProcessor;
    private final NetworkClientDelegate networkClientDelegate;
    private final ErrorEventHandler errorEventHandler;
    private final GroupState groupState;
    private boolean running;
    private final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry;

    DefaultBackgroundThread(Time time, ConsumerConfig consumerConfig, LogContext logContext, BlockingQueue<ApplicationEvent> blockingQueue, BlockingQueue<BackgroundEvent> blockingQueue2, ErrorEventHandler errorEventHandler, ApplicationEventProcessor applicationEventProcessor, ConsumerMetadata consumerMetadata, NetworkClientDelegate networkClientDelegate, GroupState groupState, CoordinatorRequestManager coordinatorRequestManager, CommitRequestManager commitRequestManager) {
        super(BACKGROUND_THREAD_NAME, true);
        this.time = time;
        this.running = true;
        this.log = logContext.logger(getClass());
        this.applicationEventQueue = blockingQueue;
        this.backgroundEventQueue = blockingQueue2;
        this.applicationEventProcessor = applicationEventProcessor;
        this.config = consumerConfig;
        this.f32metadata = consumerMetadata;
        this.networkClientDelegate = networkClientDelegate;
        this.errorEventHandler = errorEventHandler;
        this.groupState = groupState;
        this.requestManagerRegistry = new HashMap();
        this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorRequestManager));
        this.requestManagerRegistry.put(RequestManager.Type.COMMIT, Optional.ofNullable(commitRequestManager));
    }

    public DefaultBackgroundThread(Time time, ConsumerConfig consumerConfig, GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, BlockingQueue<ApplicationEvent> blockingQueue, BlockingQueue<BackgroundEvent> blockingQueue2, ConsumerMetadata consumerMetadata, KafkaClient kafkaClient) {
        super(BACKGROUND_THREAD_NAME, true);
        try {
            this.time = time;
            this.log = logContext.logger(getClass());
            this.applicationEventQueue = blockingQueue;
            this.backgroundEventQueue = blockingQueue2;
            this.config = consumerConfig;
            this.f32metadata = consumerMetadata;
            this.networkClientDelegate = new NetworkClientDelegate(this.time, this.config, logContext, kafkaClient);
            this.running = true;
            this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
            this.groupState = new GroupState(groupRebalanceConfig);
            this.requestManagerRegistry = Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
            this.applicationEventProcessor = new ApplicationEventProcessor(blockingQueue2, this.requestManagerRegistry);
        } catch (Exception e) {
            close();
            throw new KafkaException("Failed to construct background processor", e.getCause());
        }
    }

    private Map<RequestManager.Type, Optional<RequestManager>> buildRequestManagerRegistry(LogContext logContext) {
        HashMap hashMap = new HashMap();
        CoordinatorRequestManager coordinatorRequestManager = this.groupState.groupId == null ? null : new CoordinatorRequestManager(this.time, logContext, this.config.getLong("retry.backoff.ms").longValue(), this.errorEventHandler, this.groupState.groupId);
        CommitRequestManager commitRequestManager = coordinatorRequestManager == null ? null : new CommitRequestManager(this.time, logContext, null, this.config, coordinatorRequestManager, this.groupState);
        hashMap.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorRequestManager));
        hashMap.put(RequestManager.Type.COMMIT, Optional.ofNullable(commitRequestManager));
        return hashMap;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                this.log.debug("Background thread started");
                while (this.running) {
                    try {
                        runOnce();
                    } catch (WakeupException e) {
                        this.log.debug("WakeupException caught, background thread won't be interrupted");
                    }
                }
            } catch (Throwable th) {
                this.log.error("The background thread failed due to unexpected error", th);
                throw new RuntimeException(th);
            }
        } finally {
            close();
            this.log.debug("{} closed", getClass());
        }
    }

    void runOnce() {
        drain();
        long milliseconds = this.time.milliseconds();
        this.networkClientDelegate.poll(((Long) this.requestManagerRegistry.values().stream().filter((v0) -> {
            return v0.isPresent();
        }).map(optional -> {
            return ((RequestManager) optional.get()).poll(milliseconds);
        }).map(this::handlePollResult).reduce(Long.valueOf(MAX_POLL_TIMEOUT_MS), (v0, v1) -> {
            return Math.min(v0, v1);
        })).longValue(), milliseconds);
    }

    private void drain() {
        for (ApplicationEvent applicationEvent : pollApplicationEvent()) {
            this.log.debug("Consuming application event: {}", applicationEvent);
            consumeApplicationEvent(applicationEvent);
        }
    }

    long handlePollResult(NetworkClientDelegate.PollResult pollResult) {
        if (!pollResult.unsentRequests.isEmpty()) {
            this.networkClientDelegate.addAll(pollResult.unsentRequests);
        }
        return pollResult.timeUntilNextPollMs;
    }

    private Queue<ApplicationEvent> pollApplicationEvent() {
        if (this.applicationEventQueue.isEmpty()) {
            return new LinkedList();
        }
        LinkedList linkedList = new LinkedList();
        this.applicationEventQueue.drainTo(linkedList);
        return linkedList;
    }

    private void consumeApplicationEvent(ApplicationEvent applicationEvent) {
        Objects.requireNonNull(applicationEvent);
        this.applicationEventProcessor.process(applicationEvent);
    }

    public boolean isRunning() {
        return this.running;
    }

    public void wakeup() {
        this.networkClientDelegate.wakeup();
    }

    public void close() {
        this.running = false;
        wakeup();
        org.apache.kafka.common.utils.Utils.closeQuietly(this.networkClientDelegate, "network client utils");
        org.apache.kafka.common.utils.Utils.closeQuietly(this.f32metadata, "consumer metadata client");
    }
}
