package io.github.nomisRev.kafka.receiver.internals;

import io.github.nomisRev.kafka.receiver.ReceiverSettings;
import io.github.nomisRev.kafka.receiver.internals.CommittableBatch;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.channels.Channel;
import kotlinx.coroutines.channels.ChannelKt;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* compiled from: PollLoop.kt */
@Metadata(mv = {1, 7, 1}, k = 1, xi = 48, d1 = {"��º\u0001\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u0003\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010#\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u001e\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0010\t\n��\b��\u0018��*\u0004\b��\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003Bi\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0012\u0010\u0006\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0007\u0012\u0012\u0010\b\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\t\u0012\u0012\u0010\n\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\r0\u000b\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u0012\u0006\u0010\u0010\u001a\u00020\u0011\u0012\u0006\u0010\u0012\u001a\u00020\u0011\u0012\u0006\u0010\u0013\u001a\u00020\u0014¢\u0006\u0002\u0010\u0015J\b\u0010,\u001a\u00020\rH\u0002J!\u0010-\u001a\u00020.2\u0006\u0010/\u001a\u000200H\u0086@ø\u0001��ø\u0001\u0001ø\u0001\u0001¢\u0006\u0004\b1\u00102J\b\u00103\u001a\u00020.H\u0002J\u001c\u00104\u001a\u00020.2\u0006\u00105\u001a\u0002062\n\u00107\u001a\u000608j\u0002`9H\u0002J&\u0010:\u001a\u00020.2\b\u00105\u001a\u0004\u0018\u0001062\u0012\u0010;\u001a\u000e\u0012\u0004\u0012\u00020&\u0012\u0004\u0012\u00020=0<H\u0002J\u0016\u0010>\u001a\u00020.2\f\u0010?\u001a\b\u0012\u0004\u0012\u00020&0@H\u0002J\n\u0010A\u001a\u0004\u0018\u00010BH\u0002J\u0010\u0010C\u001a\u00020.2\u0006\u0010D\u001a\u00020\rH\u0002J\b\u0010E\u001a\u0004\u0018\u00010BJ\b\u0010F\u001a\u0004\u0018\u00010BJ\u0014\u0010G\u001a\u00020B2\f\u0010H\u001a\b\u0012\u0004\u0012\u00020I0@J\u0010\u0010J\u001a\u00020.2\u0006\u0010K\u001a\u00020LH\u0002R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0012\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R#\u0010\u0016\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u00180\u0017¢\u0006\b\n��\u001a\u0004\b\u0019\u0010\u001aR\u0011\u0010\u001b\u001a\u00020\u001c¢\u0006\b\n��\u001a\u0004\b\u001d\u0010\u001eR\u000e\u0010\u001f\u001a\u00020 X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\b\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010!\u001a\u00020 X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\"\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\n\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\r0\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010#\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010$\u001a\b\u0012\u0004\u0012\u00020&0%X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010'\u001a\u00020(X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010)\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010*\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010+\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0006\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010\u0007X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u000b\n\u0005\b¡\u001e0\u0001\n\u0002\b\u0019¨\u0006M"}, d2 = {"Lio/github/nomisRev/kafka/receiver/internals/EventLoop;", "K", "V", "", "ackMode", "Lio/github/nomisRev/kafka/receiver/internals/AckMode;", "settings", "Lio/github/nomisRev/kafka/receiver/ReceiverSettings;", "consumer", "Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "isRetriableException", "Lkotlin/Function1;", "", "", "scope", "Lkotlinx/coroutines/CoroutineScope;", "isActive", "Ljava/util/concurrent/atomic/AtomicBoolean;", "awaitingTransaction", "atmostOnceOffsets", "Lio/github/nomisRev/kafka/receiver/internals/AtmostOnceOffsets;", "(Lio/github/nomisRev/kafka/receiver/internals/AckMode;Lio/github/nomisRev/kafka/receiver/ReceiverSettings;Lorg/apache/kafka/clients/consumer/KafkaConsumer;Lkotlin/jvm/functions/Function1;Lkotlinx/coroutines/CoroutineScope;Ljava/util/concurrent/atomic/AtomicBoolean;Ljava/util/concurrent/atomic/AtomicBoolean;Lio/github/nomisRev/kafka/receiver/internals/AtmostOnceOffsets;)V", "channel", "Lkotlinx/coroutines/channels/Channel;", "Lorg/apache/kafka/clients/consumer/ConsumerRecords;", "getChannel", "()Lkotlinx/coroutines/channels/Channel;", "commitBatch", "Lio/github/nomisRev/kafka/receiver/internals/CommittableBatch;", "getCommitBatch", "()Lio/github/nomisRev/kafka/receiver/internals/CommittableBatch;", "consecutiveCommitFailures", "Ljava/util/concurrent/atomic/AtomicInteger;", "inProgress", "isPending", "pausedByUs", "pausedByUser", "", "Lorg/apache/kafka/common/TopicPartition;", "pollTimeout", "Ljava/time/Duration;", "requesting", "retrying", "scheduled", "checkAndSetPausedByUs", "close", "", "timeout", "Lkotlin/time/Duration;", "close-VtjQ1oo", "(JLkotlin/coroutines/Continuation;)Ljava/lang/Object;", "commit", "commitFailure", "commitArgs", "Lio/github/nomisRev/kafka/receiver/internals/CommittableBatch$CommitArgs;", "exception", "Ljava/lang/Exception;", "Lkotlin/Exception;", "commitSuccess", "offsets", "", "Lorg/apache/kafka/clients/consumer/OffsetAndMetadata;", "onPartitionsRevoked", "partitions", "", "pollTaskAfterRetry", "Lkotlinx/coroutines/Job;", "runCommitIfRequired", "force", "scheduleCommitIfRequired", "schedulePoll", "subscriber", "topicNames", "", "waitFor", "endTimeMillis", "", "kotlin-kafka"})
/* loaded from: input_file:io/github/nomisRev/kafka/receiver/internals/EventLoop.class */
public final class EventLoop<K, V> {

    @NotNull
    private final AckMode ackMode;

    @NotNull
    private final ReceiverSettings<K, V> settings;

    @NotNull
    private final KafkaConsumer<K, V> consumer;

    @NotNull
    private final Function1<Throwable, Boolean> isRetriableException;

    @NotNull
    private final CoroutineScope scope;

    @NotNull
    private final AtomicBoolean isActive;

    @NotNull
    private final AtomicBoolean awaitingTransaction;

    @NotNull
    private final AtmostOnceOffsets atmostOnceOffsets;

    @NotNull
    private final AtomicBoolean requesting;

    @NotNull
    private final AtomicBoolean pausedByUs;

    @NotNull
    private final Channel<ConsumerRecords<K, V>> channel;

    @NotNull
    private final Duration pollTimeout;

    @NotNull
    private final AtomicBoolean scheduled;

    @NotNull
    private final Set<TopicPartition> pausedByUser;

    @NotNull
    private final CommittableBatch commitBatch;

    @NotNull
    private final AtomicBoolean isPending;

    @NotNull
    private final AtomicInteger inProgress;

    @NotNull
    private final AtomicInteger consecutiveCommitFailures;

    @NotNull
    private final AtomicBoolean retrying;

    /* compiled from: PollLoop.kt */
    @Metadata(mv = {1, 7, 1}, k = 3, xi = 48)
    /* loaded from: input_file:io/github/nomisRev/kafka/receiver/internals/EventLoop$WhenMappings.class */
    public /* synthetic */ class WhenMappings {
        public static final /* synthetic */ int[] $EnumSwitchMapping$0;

        static {
            int[] iArr = new int[AckMode.values().length];
            iArr[AckMode.MANUAL_ACK.ordinal()] = 1;
            iArr[AckMode.AUTO_ACK.ordinal()] = 2;
            iArr[AckMode.ATMOST_ONCE.ordinal()] = 3;
            iArr[AckMode.EXACTLY_ONCE.ordinal()] = 4;
            $EnumSwitchMapping$0 = iArr;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public EventLoop(@NotNull AckMode ackMode, @NotNull ReceiverSettings<K, V> receiverSettings, @NotNull KafkaConsumer<K, V> kafkaConsumer, @NotNull Function1<? super Throwable, Boolean> function1, @NotNull CoroutineScope coroutineScope, @NotNull AtomicBoolean atomicBoolean, @NotNull AtomicBoolean atomicBoolean2, @NotNull AtmostOnceOffsets atmostOnceOffsets) {
        Intrinsics.checkNotNullParameter(ackMode, "ackMode");
        Intrinsics.checkNotNullParameter(receiverSettings, "settings");
        Intrinsics.checkNotNullParameter(kafkaConsumer, "consumer");
        Intrinsics.checkNotNullParameter(function1, "isRetriableException");
        Intrinsics.checkNotNullParameter(coroutineScope, "scope");
        Intrinsics.checkNotNullParameter(atomicBoolean, "isActive");
        Intrinsics.checkNotNullParameter(atomicBoolean2, "awaitingTransaction");
        Intrinsics.checkNotNullParameter(atmostOnceOffsets, "atmostOnceOffsets");
        this.ackMode = ackMode;
        this.settings = receiverSettings;
        this.consumer = kafkaConsumer;
        this.isRetriableException = function1;
        this.scope = coroutineScope;
        this.isActive = atomicBoolean;
        this.awaitingTransaction = atomicBoolean2;
        this.atmostOnceOffsets = atmostOnceOffsets;
        this.requesting = new AtomicBoolean(true);
        this.pausedByUs = new AtomicBoolean(false);
        this.channel = ChannelKt.Channel$default(0, (BufferOverflow) null, (Function1) null, 7, (Object) null);
        Duration ofSeconds = Duration.ofSeconds(kotlin.time.Duration.getInWholeSeconds-impl(this.settings.m39getPollTimeoutUwyO8pc()), kotlin.time.Duration.getNanosecondsComponent-impl(r1));
        Intrinsics.checkNotNullExpressionValue(ofSeconds, "toJavaDuration-LRDsOJo");
        this.pollTimeout = ofSeconds;
        this.scheduled = new AtomicBoolean();
        this.pausedByUser = new HashSet();
        this.commitBatch = new CommittableBatch();
        this.isPending = new AtomicBoolean();
        this.inProgress = new AtomicInteger();
        this.consecutiveCommitFailures = new AtomicInteger();
        this.retrying = new AtomicBoolean();
    }

    @NotNull
    public final Channel<ConsumerRecords<K, V>> getChannel() {
        return this.channel;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onPartitionsRevoked(Collection<TopicPartition> collection) {
        if (collection.isEmpty() || this.ackMode == AckMode.ATMOST_ONCE) {
            return;
        }
        runCommitIfRequired(true);
    }

    @NotNull
    public final Job subscriber(@NotNull Collection<String> collection) {
        Intrinsics.checkNotNullParameter(collection, "topicNames");
        return BuildersKt.launch$default(this.scope, (CoroutineContext) null, (CoroutineStart) null, new EventLoop$subscriber$1(this, collection, null), 3, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean checkAndSetPausedByUs() {
        Logger logger;
        logger = PollLoopKt.logger;
        logger.debug("checkAndSetPausedByUs");
        boolean z = !this.pausedByUs.getAndSet(true);
        if (z && this.requesting.get() && !this.retrying.get()) {
            this.consumer.wakeup();
        }
        return z;
    }

    @Nullable
    public final Job schedulePoll() {
        if (this.scheduled.getAndSet(true)) {
            return null;
        }
        return BuildersKt.launch$default(this.scope, (CoroutineContext) null, (CoroutineStart) null, new EventLoop$schedulePoll$1(this, null), 3, (Object) null);
    }

    @NotNull
    public final CommittableBatch getCommitBatch() {
        return this.commitBatch;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void commit() {
        Logger logger;
        Logger logger2;
        Logger logger3;
        if (this.isPending.compareAndSet(true, false)) {
            CommittableBatch.CommitArgs andClearOffsets = this.commitBatch.getAndClearOffsets();
            try {
            } catch (Exception e) {
                logger = PollLoopKt.logger;
                logger.error("Unexpected exception", e);
                commitFailure(andClearOffsets, e);
            }
            if (andClearOffsets.getOffsets().isEmpty()) {
                commitSuccess(andClearOffsets, andClearOffsets.getOffsets());
                return;
            }
            switch (WhenMappings.$EnumSwitchMapping$0[this.ackMode.ordinal()]) {
                case 1:
                case 2:
                    this.inProgress.incrementAndGet();
                    try {
                        logger3 = PollLoopKt.logger;
                        logger3.debug("Async committing: " + andClearOffsets.getOffsets());
                        this.consumer.commitAsync(andClearOffsets.getOffsets(), (v2, v3) -> {
                            m56commit$lambda0(r2, r3, v2, v3);
                        });
                        schedulePoll();
                        return;
                    } catch (Throwable th) {
                        this.inProgress.decrementAndGet();
                        throw th;
                    }
                case 3:
                    logger2 = PollLoopKt.logger;
                    logger2.debug("Sync committing: " + andClearOffsets.getOffsets());
                    this.consumer.commitSync(andClearOffsets.getOffsets());
                    commitSuccess(andClearOffsets, andClearOffsets.getOffsets());
                    this.atmostOnceOffsets.onCommit(andClearOffsets.getOffsets());
                    return;
                case 4:
                default:
                    return;
            }
            logger = PollLoopKt.logger;
            logger.error("Unexpected exception", e);
            commitFailure(andClearOffsets, e);
        }
    }

    private final void commitSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, ? extends OffsetAndMetadata> map) {
        List<Continuation<Unit>> continuations;
        if (!map.isEmpty()) {
            this.consecutiveCommitFailures.set(0);
        }
        pollTaskAfterRetry();
        if (commitArgs == null || (continuations = commitArgs.getContinuations()) == null) {
            return;
        }
        Iterator<T> it = continuations.iterator();
        while (it.hasNext()) {
            Continuation continuation = (Continuation) it.next();
            Result.Companion companion = Result.Companion;
            continuation.resumeWith(Result.constructor-impl(Unit.INSTANCE));
        }
    }

    private final Job pollTaskAfterRetry() {
        if (this.retrying.getAndSet(false)) {
            return schedulePoll();
        }
        return null;
    }

    private final void commitFailure(CommittableBatch.CommitArgs commitArgs, Exception exc) {
        Logger logger;
        Logger logger2;
        Logger logger3;
        logger = PollLoopKt.logger;
        logger.warn("Commit failed", exc);
        if (((Boolean) this.isRetriableException.invoke(exc)).booleanValue() || this.consecutiveCommitFailures.incrementAndGet() >= this.settings.getMaxCommitAttempts()) {
            this.commitBatch.restoreOffsets(commitArgs, true);
            logger2 = PollLoopKt.logger;
            logger2.warn("Commit failed with exception " + exc + ", retries remaining " + (this.settings.getMaxCommitAttempts() - this.consecutiveCommitFailures.get()));
            this.isPending.set(true);
            this.retrying.set(true);
            schedulePoll();
            BuildersKt.launch$default(this.scope, (CoroutineContext) null, (CoroutineStart) null, new EventLoop$commitFailure$2(this, null), 3, (Object) null);
            return;
        }
        logger3 = PollLoopKt.logger;
        logger3.debug("Cannot retry");
        pollTaskAfterRetry();
        List<Continuation<Unit>> continuations = commitArgs.getContinuations();
        List<Continuation<Unit>> list = continuations;
        if (list == null || list.isEmpty()) {
            this.channel.close(exc);
            return;
        }
        this.isPending.set(false);
        this.commitBatch.restoreOffsets(commitArgs, false);
        Iterator<T> it = continuations.iterator();
        while (it.hasNext()) {
            Continuation continuation = (Continuation) it.next();
            Result.Companion companion = Result.Companion;
            continuation.resumeWith(Result.constructor-impl(ResultKt.createFailure(exc)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void runCommitIfRequired(boolean z) {
        if (z) {
            this.isPending.set(true);
        }
        if (this.retrying.get() || !this.isPending.get()) {
            return;
        }
        commit();
    }

    @Nullable
    public final Job scheduleCommitIfRequired() {
        if (this.isActive.get() && !this.retrying.get() && this.isPending.compareAndSet(false, true)) {
            return BuildersKt.launch$default(this.scope, (CoroutineContext) null, (CoroutineStart) null, new EventLoop$scheduleCommitIfRequired$1(this, null), 3, (Object) null);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void waitFor(long j) {
        while (this.inProgress.get() > 0 && j - System.currentTimeMillis() > 0) {
            this.consumer.poll(Duration.ofMillis(1L));
        }
    }

    @Nullable
    /* renamed from: close-VtjQ1oo, reason: not valid java name */
    public final Object m55closeVtjQ1oo(long j, @NotNull Continuation<? super Unit> continuation) {
        Object withContext = BuildersKt.withContext(this.scope.getCoroutineContext(), new EventLoop$close$2(j, this, null), continuation);
        return withContext == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? withContext : Unit.INSTANCE;
    }

    /* renamed from: commit$lambda-0, reason: not valid java name */
    private static final void m56commit$lambda0(EventLoop eventLoop, CommittableBatch.CommitArgs commitArgs, Map map, Exception exc) {
        Intrinsics.checkNotNullParameter(eventLoop, "this$0");
        Intrinsics.checkNotNullParameter(commitArgs, "$commitArgs");
        eventLoop.inProgress.decrementAndGet();
        if (exc != null) {
            eventLoop.commitFailure(commitArgs, exc);
        } else {
            Intrinsics.checkNotNullExpressionValue(map, "offsets");
            eventLoop.commitSuccess(commitArgs, map);
        }
    }
}
