package org.apache.kafka.clients.consumer.internals.events;

import java.io.Closeable;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-clients-3.7.0.jar:org/apache/kafka/clients/consumer/internals/events/EventProcessor.class */
public abstract class EventProcessor<T> implements Closeable {
    private final Logger log;
    private final BlockingQueue<T> eventQueue;
    private final IdempotentCloser closer = new IdempotentCloser();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/kafka-clients-3.7.0.jar:org/apache/kafka/clients/consumer/internals/events/EventProcessor$ProcessHandler.class */
    public interface ProcessHandler<T> {
        void onProcess(T t, Optional<KafkaException> optional);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventProcessor(LogContext logContext, BlockingQueue<T> blockingQueue) {
        this.log = logContext.logger(EventProcessor.class);
        this.eventQueue = blockingQueue;
    }

    public abstract boolean process();

    protected abstract void process(T t);

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closer.close(this::closeInternal, () -> {
            this.log.warn("The event processor was already closed");
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean process(ProcessHandler<T> processHandler) {
        this.closer.assertOpen("The processor was previously closed, so no further processing can occur");
        List<T> drain = drain();
        if (drain.isEmpty()) {
            this.log.trace("No events to process");
            return false;
        }
        try {
            this.log.trace("Starting processing of {} event{}", Integer.valueOf(drain.size()), drain.size() == 1 ? "" : "s");
            for (T t : drain) {
                try {
                    Objects.requireNonNull(t, "Attempted to process a null event");
                    this.log.trace("Processing event: {}", t);
                    process((EventProcessor<T>) t);
                    processHandler.onProcess(t, Optional.empty());
                } catch (Throwable th) {
                    processHandler.onProcess(t, Optional.of(ConsumerUtils.maybeWrapAsKafkaException(th)));
                }
            }
            return true;
        } finally {
            this.log.trace("Completed processing");
        }
    }

    private void closeInternal() {
        this.log.trace("Closing event processor");
        List<T> drain = drain();
        if (drain.isEmpty()) {
            return;
        }
        KafkaException kafkaException = new KafkaException("The consumer is closed");
        drain.stream().filter(obj -> {
            return obj instanceof CompletableEvent;
        }).map(obj2 -> {
            return ((CompletableEvent) obj2).future();
        }).filter(completableFuture -> {
            return !completableFuture.isDone();
        }).forEach(completableFuture2 -> {
            this.log.debug("Completing {} with exception {}", completableFuture2, kafkaException.getMessage());
            completableFuture2.completeExceptionally(kafkaException);
        });
        this.log.debug("Discarding {} events because the consumer is closing", Integer.valueOf(drain.size()));
    }

    private List<T> drain() {
        LinkedList linkedList = new LinkedList();
        this.eventQueue.drainTo(linkedList);
        return linkedList;
    }
}
