package io.github.nomisRev.kafka.receiver.internals;

import io.github.nomisRev.kafka.receiver.CommitStrategyKt;
import io.github.nomisRev.kafka.receiver.Offset;
import io.github.nomisRev.kafka.receiver.ReceiverSettings;
import io.github.nomisRev.kafka.receiver.internals.CommittableBatch;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.SafeContinuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.DebugProbesKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.ranges.RangesKt;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineExceptionHandlerKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.channels.Channel;
import kotlinx.coroutines.channels.ChannelKt;
import kotlinx.coroutines.channels.ChannelResult;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowKt;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* compiled from: EventLoop.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��Ê\u0001\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u001e\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u0003\n\u0002\u0010\u000b\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\b\n\u0002\u0010#\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\t\n��\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\n\u0002\u0018\u0002\n\u0002\b\u000b\b��\u0018��*\u0004\b��\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003:\u0002]^Bu\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005\u0012\u0012\u0010\u0007\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\b\u0012\u0012\u0010\t\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\n\u0012\u0006\u0010\u000b\u001a\u00020\f\u0012\u0006\u0010\r\u001a\u00020\u000e\u0012\b\b\u0002\u0010\u000f\u001a\u00020\u0010\u0012\b\b\u0002\u0010\u0011\u001a\u00020\u0012\u0012\u0014\b\u0002\u0010\u0013\u001a\u000e\u0012\u0004\u0012\u00020\u0015\u0012\u0004\u0012\u00020\u00160\u0014¢\u0006\u0002\u0010\u0017J\u001b\u00103\u001a\u00020\"2\u0006\u00104\u001a\u000205H\u0086@ø\u0001��¢\u0006\u0004\b6\u00107J\u0010\u00108\u001a\u00020\"2\u0006\u00109\u001a\u00020\u0015H\u0002J\b\u0010:\u001a\u00020\"H\u0003J\u0010\u0010;\u001a\u00020\"2\u0006\u0010<\u001a\u00020=H\u0002J\u0018\u0010>\u001a\u00020\"2\u0006\u0010<\u001a\u00020=2\u0006\u0010?\u001a\u00020\u0015H\u0003J\u0018\u0010@\u001a\u00020\"2\u0006\u0010A\u001a\u00020B2\u0006\u0010C\u001a\u00020DH\u0003J$\u0010E\u001a\u00020\"2\u0006\u0010<\u001a\u00020=2\u0012\u0010F\u001a\u000e\u0012\u0004\u0012\u00020.\u0012\u0004\u0012\u00020H0GH\u0003J\u0010\u0010I\u001a\u00020\"2\u0006\u0010<\u001a\u00020=H\u0002J!\u0010J\u001a\u00020K2\u0012\u0010L\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010MH��¢\u0006\u0002\bNJ\u0016\u0010O\u001a\u00020\"2\f\u0010P\u001a\b\u0012\u0004\u0012\u00020.0\u0005H\u0003J\b\u0010Q\u001a\u00020\u0016H\u0003J\b\u0010R\u001a\u00020\"H\u0003J\u001f\u0010S\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u001c0TH��¢\u0006\u0002\bUJ\u0010\u0010V\u001a\u00020\"2\u0006\u0010W\u001a\u00020\u0016H\u0003J\u0006\u0010X\u001a\u00020\"J\b\u0010Y\u001a\u00020\"H\u0003J\u001c\u0010Z\u001a\u00020\"2\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005H\u0082@¢\u0006\u0002\u0010[J\u0016\u0010\\\u001a\u00020\"2\f\u0010P\u001a\b\u0012\u0004\u0012\u00020.0\u0005H\u0003R\u000e\u0010\u0011\u001a\u00020\u0012X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0018\u001a\u00020\u0019X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R \u0010\u001a\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u001c0\u001bX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u001d\u001a\u00020\u001e¢\u0006\b\n��\u001a\u0004\b\u001f\u0010 R\u0014\u0010!\u001a\b\u0012\u0004\u0012\u00020\"0\u001bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010#\u001a\u00020$X\u0082\u0004¢\u0006\b\n��\u0012\u0004\b%\u0010&R\u000e\u0010'\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010(\u001a\u00020\u0019X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\t\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010)\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010*\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0013\u001a\u000e\u0012\u0004\u0012\u00020\u0015\u0012\u0004\u0012\u00020\u00160\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010+\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010,\u001a\b\u0012\u0004\u0012\u00020.0-X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010/\u001a\u000200X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000b\u001a\u00020\fX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0007\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u00101\u001a\u000202X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0007\n\u0005\b¡\u001e0\u0001¨\u0006_"}, d2 = {"Lio/github/nomisRev/kafka/receiver/internals/EventLoop;", "K", "V", "", "topicNames", "", "", "settings", "Lio/github/nomisRev/kafka/receiver/ReceiverSettings;", "consumer", "Lorg/apache/kafka/clients/consumer/Consumer;", "scope", "Lkotlinx/coroutines/CoroutineScope;", "outerContext", "Lkotlin/coroutines/CoroutineContext;", "awaitingTransaction", "Ljava/util/concurrent/atomic/AtomicBoolean;", "ackMode", "Lio/github/nomisRev/kafka/receiver/internals/AckMode;", "isRetryableCommit", "Lkotlin/Function1;", "", "", "(Ljava/util/Collection;Lio/github/nomisRev/kafka/receiver/ReceiverSettings;Lorg/apache/kafka/clients/consumer/Consumer;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Ljava/util/concurrent/atomic/AtomicBoolean;Lio/github/nomisRev/kafka/receiver/internals/AckMode;Lkotlin/jvm/functions/Function1;)V", "asyncCommitsInProgress", "Ljava/util/concurrent/atomic/AtomicInteger;", "channel", "Lkotlinx/coroutines/channels/Channel;", "Lorg/apache/kafka/clients/consumer/ConsumerRecords;", "commitBatch", "Lio/github/nomisRev/kafka/receiver/internals/CommittableBatch;", "getCommitBatch", "()Lio/github/nomisRev/kafka/receiver/internals/CommittableBatch;", "commitBatchSignal", "", "commitManager", "Lkotlinx/coroutines/Job;", "getCommitManager$annotations", "()V", "commitPending", "consecutiveCommitFailures", "isPaused", "isPolling", "isRetryingCommit", "pausedPartitionsByUser", "", "Lorg/apache/kafka/common/TopicPartition;", "pollTimeout", "Ljava/time/Duration;", "utmostOnceOffsets", "Lio/github/nomisRev/kafka/receiver/internals/UtmostOnceOffsets;", "close", "timeout", "Lkotlin/time/Duration;", "close-VtjQ1oo", "(JLkotlin/coroutines/Continuation;)Ljava/lang/Object;", "closeChannel", "e", "commit", "commitAsync", "commitArgs", "Lio/github/nomisRev/kafka/receiver/internals/CommittableBatch$CommitArgs;", "commitFailure", "exception", "commitOnClose", "closeEndTime", "", "maxAttempts", "", "commitSuccess", "offsets", "", "Lorg/apache/kafka/clients/consumer/OffsetAndMetadata;", "commitSync", "offsetFromRecord", "Lio/github/nomisRev/kafka/receiver/Offset;", "record", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "offsetFromRecord$kotlin_kafka", "partitionsRevoked", "partitions", "pauseAndWakeupIfNeeded", "poll", "receive", "Lkotlinx/coroutines/flow/Flow;", "receive$kotlin_kafka", "runCommitIfRequired", "force", "scheduleCommitIfRequired", "schedulePollAfterRetrying", "subscribe", "(Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "traceCommitted", "CommitOffset", "RebalanceListener", "kotlin-kafka"})
@SourceDebugExtension({"SMAP\nEventLoop.kt\nKotlin\n*S Kotlin\n*F\n+ 1 EventLoop.kt\nio/github/nomisRev/kafka/receiver/internals/EventLoop\n+ 2 Channel.kt\nkotlinx/coroutines/channels/ChannelKt\n+ 3 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,574:1\n522#2,6:575\n556#2,5:581\n538#2,5:586\n1855#3,2:591\n1855#3,2:593\n1549#3:595\n1620#3,3:596\n*S KotlinDebug\n*F\n+ 1 EventLoop.kt\nio/github/nomisRev/kafka/receiver/internals/EventLoop\n*L\n173#1:575,6\n174#1:581,5\n175#1:586,5\n366#1:591,2\n387#1:593,2\n458#1:595\n458#1:596,3\n*E\n"})
/* loaded from: input_file:io/github/nomisRev/kafka/receiver/internals/EventLoop.class */
public final class EventLoop<K, V> {

    @NotNull
    private final Collection<String> topicNames;

    @NotNull
    private final ReceiverSettings<K, V> settings;

    @NotNull
    private final Consumer<K, V> consumer;

    @NotNull
    private final CoroutineScope scope;

    @NotNull
    private final CoroutineContext outerContext;

    @NotNull
    private final AtomicBoolean awaitingTransaction;

    @NotNull
    private final AckMode ackMode;

    @NotNull
    private final Function1<Throwable, Boolean> isRetryableCommit;

    @NotNull
    private final AtomicBoolean isPolling;

    @NotNull
    private final AtomicBoolean isPaused;

    @NotNull
    private final Channel<ConsumerRecords<K, V>> channel;

    @NotNull
    private final Duration pollTimeout;

    @NotNull
    private final Set<TopicPartition> pausedPartitionsByUser;

    @NotNull
    private final Channel<Unit> commitBatchSignal;

    @NotNull
    private final UtmostOnceOffsets utmostOnceOffsets;

    @NotNull
    private final CommittableBatch commitBatch;

    @NotNull
    private final AtomicBoolean commitPending;

    @NotNull
    private final AtomicInteger asyncCommitsInProgress;

    @NotNull
    private final AtomicInteger consecutiveCommitFailures;

    @NotNull
    private final AtomicBoolean isRetryingCommit;

    @NotNull
    private final Job commitManager;

    /* compiled from: EventLoop.kt */
    @Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��6\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\t\n\u0002\u0010\u000e\n��\b\u0082\u0004\u0018��2\u00020\u0001B+\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\f\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\t¢\u0006\u0002\u0010\u000bJ\u000e\u0010\u0012\u001a\u00020\nH\u0096@¢\u0006\u0002\u0010\u0013J\u000e\u0010\u0014\u001a\u00020\nH\u0096@¢\u0006\u0002\u0010\u0013J\b\u0010\u0015\u001a\u00020\u0007H\u0002J\b\u0010\u0016\u001a\u00020\u0017H\u0016R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0004\u001a\u00020\u0005X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u000e\u0010\u000fR\u0014\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0002\u001a\u00020\u0003X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0010\u0010\u0011¨\u0006\u0018"}, d2 = {"Lio/github/nomisRev/kafka/receiver/internals/EventLoop$CommitOffset;", "Lio/github/nomisRev/kafka/receiver/Offset;", "topicPartition", "Lorg/apache/kafka/common/TopicPartition;", "offset", "", "commitBatchSize", "", "reachedMaxCommitBatchSize", "Lkotlinx/coroutines/channels/Channel;", "", "(Lio/github/nomisRev/kafka/receiver/internals/EventLoop;Lorg/apache/kafka/common/TopicPartition;JILkotlinx/coroutines/channels/Channel;)V", "acknowledged", "Ljava/util/concurrent/atomic/AtomicBoolean;", "getOffset", "()J", "getTopicPartition", "()Lorg/apache/kafka/common/TopicPartition;", "acknowledge", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "commit", "maybeUpdateOffset", "toString", "", "kotlin-kafka"})
    /* loaded from: input_file:io/github/nomisRev/kafka/receiver/internals/EventLoop$CommitOffset.class */
    private final class CommitOffset implements Offset {

        @NotNull
        private final TopicPartition topicPartition;
        private final long offset;
        private final int commitBatchSize;

        @NotNull
        private final Channel<Unit> reachedMaxCommitBatchSize;

        @NotNull
        private final AtomicBoolean acknowledged;
        final /* synthetic */ EventLoop<K, V> this$0;

        public CommitOffset(@NotNull EventLoop eventLoop, TopicPartition topicPartition, long j, @NotNull int i, Channel<Unit> channel) {
            Intrinsics.checkNotNullParameter(topicPartition, "topicPartition");
            Intrinsics.checkNotNullParameter(channel, "reachedMaxCommitBatchSize");
            this.this$0 = eventLoop;
            this.topicPartition = topicPartition;
            this.offset = j;
            this.commitBatchSize = i;
            this.reachedMaxCommitBatchSize = channel;
            this.acknowledged = new AtomicBoolean(false);
        }

        @Override // io.github.nomisRev.kafka.receiver.Offset
        @NotNull
        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        @Override // io.github.nomisRev.kafka.receiver.Offset
        public long getOffset() {
            return this.offset;
        }

        @Override // io.github.nomisRev.kafka.receiver.Offset
        @Nullable
        public Object commit(@NotNull Continuation<? super Unit> continuation) {
            if (maybeUpdateOffset() <= 0) {
                return Unit.INSTANCE;
            }
            EventLoop<K, V> eventLoop = this.this$0;
            Continuation<? super Unit> safeContinuation = new SafeContinuation<>(IntrinsicsKt.intercepted(continuation));
            eventLoop.getCommitBatch().addContinuation(safeContinuation);
            eventLoop.scheduleCommitIfRequired();
            Object orThrow = safeContinuation.getOrThrow();
            if (orThrow == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
                DebugProbesKt.probeCoroutineSuspended(continuation);
            }
            return orThrow == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? orThrow : Unit.INSTANCE;
        }

        @Override // io.github.nomisRev.kafka.receiver.Offset
        @Nullable
        public Object acknowledge(@NotNull Continuation<? super Unit> continuation) {
            long maybeUpdateOffset = maybeUpdateOffset();
            long j = this.commitBatchSize;
            if (!(1 <= j ? j <= maybeUpdateOffset : false)) {
                return Unit.INSTANCE;
            }
            Object send = this.reachedMaxCommitBatchSize.send(Unit.INSTANCE, continuation);
            return send == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? send : Unit.INSTANCE;
        }

        private final int maybeUpdateOffset() {
            return this.acknowledged.compareAndSet(false, true) ? this.this$0.getCommitBatch().updateOffset(getTopicPartition(), getOffset()) : this.this$0.getCommitBatch().batchSize();
        }

        @NotNull
        public String toString() {
            return new StringBuilder().append(getTopicPartition()).append('@').append(getOffset()).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: EventLoop.kt */
    @ConsumerThread
    @Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��(\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0010\u001f\n\u0002\u0018\u0002\n��\n\u0002\u0010\u001e\n��\n\u0002\u0010 \n��\b\u0083\u0004\u0018��2\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\u0016\u0010\u0003\u001a\u00020\u00042\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006H\u0017J\u0016\u0010\b\u001a\u00020\u00042\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\tH\u0017J\u001c\u0010\n\u001a\b\u0012\u0004\u0012\u00020\u00070\u000b2\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\tH\u0002¨\u0006\f"}, d2 = {"Lio/github/nomisRev/kafka/receiver/internals/EventLoop$RebalanceListener;", "Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;", "(Lio/github/nomisRev/kafka/receiver/internals/EventLoop;)V", "onPartitionsAssigned", "", "partitions", "", "Lorg/apache/kafka/common/TopicPartition;", "onPartitionsRevoked", "", "partitionsToRepause", "", "kotlin-kafka"})
    @SourceDebugExtension({"SMAP\nEventLoop.kt\nKotlin\n*S Kotlin\n*F\n+ 1 EventLoop.kt\nio/github/nomisRev/kafka/receiver/internals/EventLoop$RebalanceListener\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,574:1\n1855#2,2:575\n*S KotlinDebug\n*F\n+ 1 EventLoop.kt\nio/github/nomisRev/kafka/receiver/internals/EventLoop$RebalanceListener\n*L\n239#1:575,2\n*E\n"})
    /* loaded from: input_file:io/github/nomisRev/kafka/receiver/internals/EventLoop$RebalanceListener.class */
    public final class RebalanceListener implements ConsumerRebalanceListener {
        public RebalanceListener() {
        }

        @ConsumerThread
        public void onPartitionsAssigned(@NotNull Collection<TopicPartition> collection) {
            Logger logger;
            Logger logger2;
            Intrinsics.checkNotNullParameter(collection, "partitions");
            EventLoopKt.checkConsumerThread("RebalanceListener.onPartitionsAssigned");
            logger = EventLoopKt.logger;
            logger.debug("onPartitionsAssigned {}", collection);
            boolean z = (!collection.isEmpty()) && ((EventLoop) EventLoop.this).isPaused.get();
            if (z) {
                logger2 = EventLoopKt.logger;
                logger2.debug("Rebalance during back pressure, re-pausing new assignments");
                ((EventLoop) EventLoop.this).consumer.pause(collection);
            }
            if (!((EventLoop) EventLoop.this).pausedPartitionsByUser.isEmpty()) {
                List<TopicPartition> partitionsToRepause = partitionsToRepause(collection);
                if (!z) {
                    if (!partitionsToRepause.isEmpty()) {
                        ((EventLoop) EventLoop.this).consumer.pause(partitionsToRepause);
                    }
                }
            }
            EventLoop.this.traceCommitted(collection);
        }

        @ConsumerThread
        public void onPartitionsRevoked(@NotNull Collection<TopicPartition> collection) {
            Logger logger;
            Intrinsics.checkNotNullParameter(collection, "partitions");
            EventLoopKt.checkConsumerThread("RebalanceListener.onPartitionsRevoked");
            logger = EventLoopKt.logger;
            logger.debug("onPartitionsRevoked {}", collection);
            EventLoop.this.partitionsRevoked(collection);
            EventLoop.this.getCommitBatch().onPartitionsRevoked(collection);
        }

        private final List<TopicPartition> partitionsToRepause(Collection<TopicPartition> collection) {
            EventLoop<K, V> eventLoop = EventLoop.this;
            List createListBuilder = CollectionsKt.createListBuilder();
            for (TopicPartition topicPartition : ((EventLoop) eventLoop).pausedPartitionsByUser) {
                if (collection.contains(topicPartition)) {
                    createListBuilder.add(topicPartition);
                } else {
                    ((EventLoop) eventLoop).pausedPartitionsByUser.remove(topicPartition);
                }
            }
            return CollectionsKt.build(createListBuilder);
        }
    }

    /* compiled from: EventLoop.kt */
    @Metadata(mv = {1, 9, 0}, k = 3, xi = 48)
    /* loaded from: input_file:io/github/nomisRev/kafka/receiver/internals/EventLoop$WhenMappings.class */
    public /* synthetic */ class WhenMappings {
        public static final /* synthetic */ int[] $EnumSwitchMapping$0;

        static {
            int[] iArr = new int[AckMode.values().length];
            try {
                iArr[AckMode.MANUAL_ACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                iArr[AckMode.AUTO_ACK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                iArr[AckMode.ATMOST_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                iArr[AckMode.EXACTLY_ONCE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $EnumSwitchMapping$0 = iArr;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public EventLoop(@NotNull Collection<String> collection, @NotNull ReceiverSettings<K, V> receiverSettings, @NotNull Consumer<K, V> consumer, @NotNull CoroutineScope coroutineScope, @NotNull CoroutineContext coroutineContext, @NotNull AtomicBoolean atomicBoolean, @NotNull AckMode ackMode, @NotNull Function1<? super Throwable, Boolean> function1) {
        Intrinsics.checkNotNullParameter(collection, "topicNames");
        Intrinsics.checkNotNullParameter(receiverSettings, "settings");
        Intrinsics.checkNotNullParameter(consumer, "consumer");
        Intrinsics.checkNotNullParameter(coroutineScope, "scope");
        Intrinsics.checkNotNullParameter(coroutineContext, "outerContext");
        Intrinsics.checkNotNullParameter(atomicBoolean, "awaitingTransaction");
        Intrinsics.checkNotNullParameter(ackMode, "ackMode");
        Intrinsics.checkNotNullParameter(function1, "isRetryableCommit");
        this.topicNames = collection;
        this.settings = receiverSettings;
        this.consumer = consumer;
        this.scope = coroutineScope;
        this.outerContext = coroutineContext;
        this.awaitingTransaction = atomicBoolean;
        this.ackMode = ackMode;
        this.isRetryableCommit = function1;
        this.isPolling = new AtomicBoolean(true);
        this.isPaused = new AtomicBoolean(false);
        this.channel = ChannelKt.Channel$default(0, (BufferOverflow) null, (Function1) null, 7, (Object) null);
        Duration ofSeconds = Duration.ofSeconds(kotlin.time.Duration.getInWholeSeconds-impl(this.settings.m64getPollTimeoutUwyO8pc()), kotlin.time.Duration.getNanosecondsComponent-impl(r1));
        Intrinsics.checkNotNullExpressionValue(ofSeconds, "toComponents-impl(...)");
        this.pollTimeout = ofSeconds;
        this.pausedPartitionsByUser = new HashSet();
        this.commitBatchSignal = ChannelKt.Channel$default(0, (BufferOverflow) null, (Function1) null, 6, (Object) null);
        this.utmostOnceOffsets = new UtmostOnceOffsets();
        this.commitBatch = new CommittableBatch();
        this.commitPending = new AtomicBoolean();
        this.asyncCommitsInProgress = new AtomicInteger();
        this.consecutiveCommitFailures = new AtomicInteger();
        this.isRetryingCommit = new AtomicBoolean();
        this.commitManager = BuildersKt.launch$default(this.scope, (CoroutineContext) null, CoroutineStart.LAZY, new EventLoop$commitManager$1(this, null), 1, (Object) null);
    }

    public /* synthetic */ EventLoop(Collection collection, ReceiverSettings receiverSettings, Consumer consumer, CoroutineScope coroutineScope, CoroutineContext coroutineContext, AtomicBoolean atomicBoolean, AckMode ackMode, Function1 function1, int i, DefaultConstructorMarker defaultConstructorMarker) {
        this(collection, receiverSettings, consumer, coroutineScope, coroutineContext, (i & 32) != 0 ? new AtomicBoolean(false) : atomicBoolean, (i & 64) != 0 ? AckMode.MANUAL_ACK : ackMode, (i & 128) != 0 ? new Function1<Throwable, Boolean>() { // from class: io.github.nomisRev.kafka.receiver.internals.EventLoop.1
            @NotNull
            public final Boolean invoke(@NotNull Throwable th) {
                Intrinsics.checkNotNullParameter(th, "e");
                return Boolean.valueOf(th instanceof RetriableCommitFailedException);
            }
        } : function1);
    }

    @NotNull
    public final Flow<ConsumerRecords<K, V>> receive$kotlin_kafka() {
        return FlowKt.onCompletion(FlowKt.onStart(FlowKt.consumeAsFlow(this.channel), new EventLoop$receive$1(this, null)), new EventLoop$receive$2(this, null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Object subscribe(Collection<String> collection, Continuation<? super Unit> continuation) {
        Object withContext = BuildersKt.withContext(this.scope.getCoroutineContext(), new EventLoop$subscribe$2(this, collection, null), continuation);
        return withContext == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? withContext : Unit.INSTANCE;
    }

    @ConsumerThread
    private final boolean pauseAndWakeupIfNeeded() {
        Logger logger;
        EventLoopKt.checkConsumerThread("pauseAndWakeupIfNeeded");
        boolean z = !this.isPaused.getAndSet(true);
        boolean z2 = z && this.isPolling.get() && !this.isRetryingCommit.get();
        logger = EventLoopKt.logger;
        logger.debug("checkAndSetPausedByUs: already paused {}, shouldWakeUpConsumer {}", Boolean.valueOf(z), Boolean.valueOf(z2));
        if (z2) {
            this.consumer.wakeup();
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ConsumerThread
    public final void poll() {
        Logger logger;
        Logger logger2;
        Logger logger3;
        Logger logger4;
        Logger logger5;
        ConsumerRecords<?, ?> consumerRecords;
        Logger logger6;
        Logger logger7;
        Logger logger8;
        Logger logger9;
        Logger logger10;
        Logger logger11;
        try {
            runCommitIfRequired(false);
            boolean z = this.settings.getMaxDeferredCommits() > 0 && this.commitBatch.deferredCount() >= this.settings.getMaxDeferredCommits();
            if ((z || this.isRetryingCommit.get()) ? false : this.isPolling.get()) {
                if (this.awaitingTransaction.get()) {
                    if (pauseAndWakeupIfNeeded()) {
                        Set<TopicPartition> set = this.pausedPartitionsByUser;
                        Set paused = this.consumer.paused();
                        Intrinsics.checkNotNullExpressionValue(paused, "paused(...)");
                        set.addAll(paused);
                        this.consumer.pause(this.consumer.assignment());
                        logger10 = EventLoopKt.logger;
                        logger10.debug("Paused - awaiting transaction");
                    }
                } else if (this.isPaused.getAndSet(false)) {
                    HashSet hashSet = new HashSet(this.consumer.assignment());
                    hashSet.removeAll(this.pausedPartitionsByUser);
                    this.pausedPartitionsByUser.clear();
                    this.consumer.resume(hashSet);
                    logger11 = EventLoopKt.logger;
                    logger11.debug("Resumed partitions: {}", hashSet);
                }
            } else if (pauseAndWakeupIfNeeded()) {
                Set<TopicPartition> set2 = this.pausedPartitionsByUser;
                Set paused2 = this.consumer.paused();
                Intrinsics.checkNotNullExpressionValue(paused2, "paused(...)");
                set2.addAll(paused2);
                this.consumer.pause(this.consumer.assignment());
                if (z) {
                    logger4 = EventLoopKt.logger;
                    logger4.debug("Paused - too many deferred commits");
                } else if (this.isRetryingCommit.get()) {
                    logger3 = EventLoopKt.logger;
                    logger3.debug("Paused - commits are retrying");
                } else {
                    logger2 = EventLoopKt.logger;
                    logger2.debug("Paused - back pressure");
                }
            }
            try {
                ConsumerRecords<?, ?> poll = this.consumer.poll(this.pollTimeout);
                Intrinsics.checkNotNull(poll);
                consumerRecords = poll;
            } catch (WakeupException e) {
                logger5 = EventLoopKt.logger;
                logger5.debug("Consumer woken");
                ConsumerRecords<?, ?> empty = ConsumerRecords.empty();
                Intrinsics.checkNotNull(empty);
                consumerRecords = empty;
            }
            ConsumerRecords<?, ?> consumerRecords2 = consumerRecords;
            if (consumerRecords2.isEmpty()) {
                return;
            }
            if (this.settings.getMaxDeferredCommits() > 0) {
                this.commitBatch.addUncommitted(consumerRecords2);
            }
            logger6 = EventLoopKt.logger;
            logger6.debug("Attempting to send " + consumerRecords2.count() + " records to Channel");
            Object obj = this.channel.trySend-JP2dKIU(consumerRecords2);
            if (!(obj instanceof ChannelResult.Failed)) {
                poll();
            }
            if (obj instanceof ChannelResult.Closed) {
                Throwable th = ChannelResult.exceptionOrNull-impl(obj);
                logger9 = EventLoopKt.logger;
                logger9.error("Channel closed when trying to send records.", th);
            }
            if (obj instanceof ChannelResult.Failed) {
                Throwable th2 = ChannelResult.exceptionOrNull-impl(obj);
                if (th2 != null) {
                    logger8 = EventLoopKt.logger;
                    logger8.error("Channel send failed when trying to send records.", th2);
                    closeChannel(th2);
                } else {
                    logger7 = EventLoopKt.logger;
                    logger7.debug("Back-pressuring kafka consumer. Might pause KafkaConsumer on next poll tick.");
                }
                this.isPolling.set(false);
                BuildersKt.launch$default(this.scope, this.outerContext, (CoroutineStart) null, new EventLoop$poll$3$1(this, consumerRecords2, null), 2, (Object) null);
            }
        } catch (Exception e2) {
            logger = EventLoopKt.logger;
            logger.error("Polling encountered an unexpected exception", e2);
            closeChannel(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ConsumerThread
    public final void partitionsRevoked(Collection<TopicPartition> collection) {
        if (collection.isEmpty() || this.ackMode == AckMode.ATMOST_ONCE) {
            return;
        }
        runCommitIfRequired(true);
    }

    @NotNull
    public final CommittableBatch getCommitBatch() {
        return this.commitBatch;
    }

    @ConsumerThread
    private final void schedulePollAfterRetrying() {
        if (this.isRetryingCommit.getAndSet(false)) {
            poll();
        }
    }

    @ConsumerThread
    private final void runCommitIfRequired(boolean z) {
        if (z) {
            this.commitPending.set(true);
        }
        if (this.isRetryingCommit.get() || !this.commitPending.get()) {
            return;
        }
        commit();
    }

    public final void scheduleCommitIfRequired() {
        if (this.isRetryingCommit.get() || !this.commitPending.compareAndSet(false, true)) {
            return;
        }
        BuildersKt.launch$default(this.scope, (CoroutineContext) null, (CoroutineStart) null, new EventLoop$scheduleCommitIfRequired$1(this, null), 3, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ConsumerThread
    public final void commit() {
        EventLoopKt.checkConsumerThread("commit");
        if (this.commitPending.compareAndSet(true, false)) {
            CommittableBatch.CommitArgs andClearOffsets = this.commitBatch.getAndClearOffsets();
            if (andClearOffsets.getOffsets().isEmpty()) {
                commitSuccess(andClearOffsets, andClearOffsets.getOffsets());
                return;
            }
            switch (WhenMappings.$EnumSwitchMapping$0[this.ackMode.ordinal()]) {
                case 1:
                case 2:
                    commitAsync(andClearOffsets);
                    return;
                case 3:
                    commitSync(andClearOffsets);
                    return;
                case 4:
                default:
                    return;
            }
        }
    }

    private final void commitAsync(CommittableBatch.CommitArgs commitArgs) {
        Object obj;
        Logger logger;
        try {
            Result.Companion companion = Result.Companion;
            EventLoop<K, V> eventLoop = this;
            eventLoop.asyncCommitsInProgress.incrementAndGet();
            logger = EventLoopKt.logger;
            logger.debug("Async committing: {}", commitArgs.getOffsets());
            eventLoop.consumer.commitAsync(commitArgs.getOffsets(), (v2, v3) -> {
                commitAsync$lambda$4$lambda$3(r2, r3, v2, v3);
            });
            eventLoop.poll();
            obj = Result.constructor-impl(Unit.INSTANCE);
        } catch (Throwable th) {
            Result.Companion companion2 = Result.Companion;
            obj = Result.constructor-impl(ResultKt.createFailure(th));
        }
        Throwable th2 = Result.exceptionOrNull-impl(obj);
        if (th2 != null) {
            try {
                Result.Companion companion3 = Result.Companion;
                this.asyncCommitsInProgress.decrementAndGet();
                commitFailure(commitArgs, th2);
                Result.constructor-impl(Unit.INSTANCE);
            } catch (Throwable th3) {
                Result.Companion companion4 = Result.Companion;
                Result.constructor-impl(ResultKt.createFailure(th3));
            }
        }
    }

    private final void commitSync(CommittableBatch.CommitArgs commitArgs) {
        Object obj;
        Logger logger;
        try {
            Result.Companion companion = Result.Companion;
            EventLoop<K, V> eventLoop = this;
            logger = EventLoopKt.logger;
            logger.debug("Sync committing: {}", commitArgs.getOffsets());
            eventLoop.consumer.commitSync(commitArgs.getOffsets());
            eventLoop.commitSuccess(commitArgs, commitArgs.getOffsets());
            eventLoop.utmostOnceOffsets.onCommit(commitArgs.getOffsets());
            obj = Result.constructor-impl(Unit.INSTANCE);
        } catch (Throwable th) {
            Result.Companion companion2 = Result.Companion;
            obj = Result.constructor-impl(ResultKt.createFailure(th));
        }
        Throwable th2 = Result.exceptionOrNull-impl(obj);
        if (th2 != null) {
            try {
                Result.Companion companion3 = Result.Companion;
                commitFailure(commitArgs, th2);
                Result.constructor-impl(Unit.INSTANCE);
            } catch (Throwable th3) {
                Result.Companion companion4 = Result.Companion;
                Result.constructor-impl(ResultKt.createFailure(th3));
            }
        }
    }

    @ConsumerThread
    private final void commitSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, ? extends OffsetAndMetadata> map) {
        EventLoopKt.checkConsumerThread("commitSuccess");
        if (!map.isEmpty()) {
            this.consecutiveCommitFailures.set(0);
        }
        schedulePollAfterRetrying();
        List<Continuation<Unit>> continuations = commitArgs.getContinuations();
        if (continuations != null) {
            Iterator<T> it = continuations.iterator();
            while (it.hasNext()) {
                Continuation continuation = (Continuation) it.next();
                Result.Companion companion = Result.Companion;
                continuation.resumeWith(Result.constructor-impl(Unit.INSTANCE));
            }
        }
    }

    @ConsumerThread
    private final void commitFailure(CommittableBatch.CommitArgs commitArgs, Throwable th) {
        Logger logger;
        Logger logger2;
        Logger logger3;
        EventLoopKt.checkConsumerThread("commitFailure");
        logger = EventLoopKt.logger;
        logger.warn("Commit failed", th);
        if (((Boolean) this.isRetryableCommit.invoke(th)).booleanValue() || this.consecutiveCommitFailures.incrementAndGet() >= this.settings.getMaxCommitAttempts()) {
            this.commitBatch.restoreOffsets(commitArgs, true);
            logger2 = EventLoopKt.logger;
            logger2.warn("Commit failed with exception " + th + ", retries remaining " + (this.settings.getMaxCommitAttempts() - this.consecutiveCommitFailures.get()));
            this.commitPending.set(true);
            this.isRetryingCommit.set(true);
            poll();
            BuildersKt.launch$default(this.scope, (CoroutineContext) null, CoroutineStart.UNDISPATCHED, new EventLoop$commitFailure$2(this, null), 1, (Object) null);
            return;
        }
        logger3 = EventLoopKt.logger;
        logger3.debug("Commit failed with exception " + th + ", zero retries remaining");
        schedulePollAfterRetrying();
        List<Continuation<Unit>> continuations = commitArgs.getContinuations();
        List<Continuation<Unit>> list = continuations;
        if (list == null || list.isEmpty()) {
            closeChannel(th);
            return;
        }
        this.commitPending.set(false);
        this.commitBatch.restoreOffsets(commitArgs, false);
        Iterator<T> it = continuations.iterator();
        while (it.hasNext()) {
            Continuation continuation = (Continuation) it.next();
            Result.Companion companion = Result.Companion;
            continuation.resumeWith(Result.constructor-impl(ResultKt.createFailure(th)));
        }
    }

    @Nullable
    /* renamed from: close-VtjQ1oo, reason: not valid java name */
    public final Object m79closeVtjQ1oo(long j, @NotNull Continuation<? super Unit> continuation) {
        Object withContext = BuildersKt.withContext(this.scope.getCoroutineContext(), new EventLoop$close$2(this, j, null), continuation);
        return withContext == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? withContext : Unit.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ConsumerThread
    public final void commitOnClose(long j, int i) {
        try {
            boolean undoCommitAhead = WhenMappings.$EnumSwitchMapping$0[this.ackMode.ordinal()] == 3 ? this.utmostOnceOffsets.undoCommitAhead(this.commitBatch) : true;
            if (this.ackMode != AckMode.EXACTLY_ONCE) {
                runCommitIfRequired(undoCommitAhead);
                while (this.asyncCommitsInProgress.get() > 0 && j - System.currentTimeMillis() > 0) {
                    this.consumer.poll(Duration.ofMillis(1L));
                }
            }
            this.consumer.close(Duration.ofMillis(RangesKt.coerceAtLeast(j - System.currentTimeMillis(), 0L)));
        } catch (WakeupException e) {
            if (i == 0) {
                throw e;
            }
            commitOnClose(j, i - 1);
        }
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:4:0x000b
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    /* JADX INFO: Access modifiers changed from: private */
    @io.github.nomisRev.kafka.receiver.internals.ConsumerThread
    public final void traceCommitted(java.util.Collection<org.apache.kafka.common.TopicPartition> r7) {
        /*
            r6 = this;
            org.slf4j.Logger r0 = io.github.nomisRev.kafka.receiver.internals.EventLoopKt.access$getLogger$p()
            boolean r0 = r0.isTraceEnabled()
            if (r0 == 0) goto Le2
        Lc:
            r0 = r7
            java.lang.Iterable r0 = (java.lang.Iterable) r0     // Catch: java.lang.Exception -> Ld2
            r9 = r0
            r0 = 0
            r10 = r0
            r0 = r9
            r11 = r0
            java.util.ArrayList r0 = new java.util.ArrayList     // Catch: java.lang.Exception -> Ld2
            r1 = r0
            r2 = r9
            r3 = 10
            int r2 = kotlin.collections.CollectionsKt.collectionSizeOrDefault(r2, r3)     // Catch: java.lang.Exception -> Ld2
            r1.<init>(r2)     // Catch: java.lang.Exception -> Ld2
            java.util.Collection r0 = (java.util.Collection) r0     // Catch: java.lang.Exception -> Ld2
            r12 = r0
            r0 = 0
            r13 = r0
            r0 = r11
            java.util.Iterator r0 = r0.iterator()     // Catch: java.lang.Exception -> Ld2
            r14 = r0
        L35:
            r0 = r14
            boolean r0 = r0.hasNext()     // Catch: java.lang.Exception -> Ld2
            if (r0 == 0) goto L8b
            r0 = r14
            java.lang.Object r0 = r0.next()     // Catch: java.lang.Exception -> Ld2
            r15 = r0
            r0 = r12
            r1 = r15
            org.apache.kafka.common.TopicPartition r1 = (org.apache.kafka.common.TopicPartition) r1     // Catch: java.lang.Exception -> Ld2
            r16 = r1
            r18 = r0
            r0 = 0
            r17 = r0
            java.lang.StringBuilder r0 = new java.lang.StringBuilder     // Catch: java.lang.Exception -> Ld2
            r1 = r0
            r1.<init>()     // Catch: java.lang.Exception -> Ld2
            r1 = r16
            java.lang.StringBuilder r0 = r0.append(r1)     // Catch: java.lang.Exception -> Ld2
            java.lang.String r1 = " position: "
            java.lang.StringBuilder r0 = r0.append(r1)     // Catch: java.lang.Exception -> Ld2
            r1 = r6
            org.apache.kafka.clients.consumer.Consumer<K, V> r1 = r1.consumer     // Catch: java.lang.Exception -> Ld2
            r2 = r16
            r3 = 5
            java.time.Duration r3 = java.time.Duration.ofSeconds(r3)     // Catch: java.lang.Exception -> Ld2
            long r1 = r1.position(r2, r3)     // Catch: java.lang.Exception -> Ld2
            java.lang.StringBuilder r0 = r0.append(r1)     // Catch: java.lang.Exception -> Ld2
            java.lang.String r0 = r0.toString()     // Catch: java.lang.Exception -> Ld2
            r1 = r18
            r2 = r0; r0 = r1; r1 = r2;      // Catch: java.lang.Exception -> Ld2
            boolean r0 = r0.add(r1)     // Catch: java.lang.Exception -> Ld2
            goto L35
        L8b:
            r0 = r12
            java.util.List r0 = (java.util.List) r0     // Catch: java.lang.Exception -> Ld2
            r8 = r0
            r0 = r6
            org.apache.kafka.clients.consumer.Consumer<K, V> r0 = r0.consumer     // Catch: java.lang.Exception -> Ld2
            r1 = r7
            java.lang.Iterable r1 = (java.lang.Iterable) r1     // Catch: java.lang.Exception -> Ld2
            java.util.Set r1 = kotlin.collections.CollectionsKt.toSet(r1)     // Catch: java.lang.Exception -> Ld2
            r2 = 5
            java.time.Duration r2 = java.time.Duration.ofSeconds(r2)     // Catch: java.lang.Exception -> Ld2
            java.util.Map r0 = r0.committed(r1, r2)     // Catch: java.lang.Exception -> Ld2
            r9 = r0
            org.slf4j.Logger r0 = io.github.nomisRev.kafka.receiver.internals.EventLoopKt.access$getLogger$p()     // Catch: java.lang.Exception -> Ld2
            java.lang.StringBuilder r1 = new java.lang.StringBuilder     // Catch: java.lang.Exception -> Ld2
            r2 = r1
            r2.<init>()     // Catch: java.lang.Exception -> Ld2
            java.lang.String r2 = "positions: "
            java.lang.StringBuilder r1 = r1.append(r2)     // Catch: java.lang.Exception -> Ld2
            r2 = r8
            java.lang.StringBuilder r1 = r1.append(r2)     // Catch: java.lang.Exception -> Ld2
            java.lang.String r2 = ", committed: "
            java.lang.StringBuilder r1 = r1.append(r2)     // Catch: java.lang.Exception -> Ld2
            r2 = r9
            java.lang.StringBuilder r1 = r1.append(r2)     // Catch: java.lang.Exception -> Ld2
            java.lang.String r1 = r1.toString()     // Catch: java.lang.Exception -> Ld2
            r0.trace(r1)     // Catch: java.lang.Exception -> Ld2
            goto Le2
        Ld2:
            r8 = move-exception
            org.slf4j.Logger r0 = io.github.nomisRev.kafka.receiver.internals.EventLoopKt.access$getLogger$p()
            java.lang.String r1 = "Failed to get positions or committed"
            r2 = r8
            java.lang.Throwable r2 = (java.lang.Throwable) r2
            r0.error(r1, r2)
        Le2:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.github.nomisRev.kafka.receiver.internals.EventLoop.traceCommitted(java.util.Collection):void");
    }

    @NotNull
    public final Offset offsetFromRecord$kotlin_kafka(@NotNull ConsumerRecord<K, V> consumerRecord) {
        Intrinsics.checkNotNullParameter(consumerRecord, "record");
        return new CommitOffset(this, new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), CommitStrategyKt.size(this.settings.getCommitStrategy()), this.commitBatchSignal);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void closeChannel(Throwable th) {
        if (!this.channel.close(th)) {
            CoroutineExceptionHandlerKt.handleCoroutineException(this.outerContext, th);
        }
    }

    private static /* synthetic */ void getCommitManager$annotations() {
    }

    private static final void commitAsync$lambda$4$lambda$3(EventLoop eventLoop, CommittableBatch.CommitArgs commitArgs, Map map, Exception exc) {
        Intrinsics.checkNotNullParameter(eventLoop, "$this_runCatching");
        Intrinsics.checkNotNullParameter(commitArgs, "$commitArgs");
        eventLoop.asyncCommitsInProgress.decrementAndGet();
        if (exc != null) {
            eventLoop.commitFailure(commitArgs, exc);
        } else {
            Intrinsics.checkNotNull(map);
            eventLoop.commitSuccess(commitArgs, map);
        }
    }
}
