package io.github.jchapuis.fs2.kafka.mock.impl;

import cats.effect.IO;
import cats.effect.IO$;
import cats.effect.IOPlatform;
import cats.effect.kernel.Ref;
import cats.effect.std.Mutex;
import cats.effect.unsafe.IORuntime;
import fs2.kafka.GenericSerializer;
import fs2.kafka.Headers$;
import fs2.kafka.Key;
import fs2.kafka.Value;
import fs2.kafka.consumer.MkConsumer;
import io.github.jchapuis.fs2.kafka.mock.MockKafkaConsumer;
import java.time.Instant;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyVals$;

/* compiled from: NativeMockKafkaConsumer.scala */
/* loaded from: input_file:io/github/jchapuis/fs2/kafka/mock/impl/NativeMockKafkaConsumer.class */
public class NativeMockKafkaConsumer implements MockKafkaConsumer {
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(NativeMockKafkaConsumer.class.getDeclaredField("0bitmap$1"));

    /* renamed from: 0bitmap$1, reason: not valid java name */
    public long f10bitmap$1;
    private final MockConsumer mockConsumer;
    private final Ref<IO, Map<String, Object>> currentOffsets;
    private final Mutex<IO> mutex;
    private final IORuntime IORuntime;
    public final int io$github$jchapuis$fs2$kafka$mock$impl$NativeMockKafkaConsumer$$singlePartition = 0;
    public MkConsumer mkConsumer$lzy1;

    public NativeMockKafkaConsumer(MockConsumer<byte[], byte[]> mockConsumer, Ref<IO, Map<String, Object>> ref, Mutex<IO> mutex, IORuntime iORuntime) {
        this.mockConsumer = mockConsumer;
        this.currentOffsets = ref;
        this.mutex = mutex;
        this.IORuntime = iORuntime;
    }

    @Override // io.github.jchapuis.fs2.kafka.mock.MockKafkaConsumer
    public /* bridge */ /* synthetic */ Option publish$default$4() {
        Option publish$default$4;
        publish$default$4 = publish$default$4();
        return publish$default$4;
    }

    public MockConsumer<byte[], byte[]> mockConsumer() {
        return this.mockConsumer;
    }

    private IO<Object> incrementOffset(String str) {
        return ((IO) this.currentOffsets.get()).flatMap(map -> {
            return ((IO) this.currentOffsets.updateAndGet(map -> {
                return map.updated(str, BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(map.apply(str)) + 1));
            })).map(map2 -> {
                return BoxesRunTime.unboxToLong(map2.apply(str));
            });
        });
    }

    @Override // io.github.jchapuis.fs2.kafka.mock.MockKafkaConsumer
    public <K, V> IO<BoxedUnit> publish(String str, K k, V v, Option<Instant> option, GenericSerializer<Key, IO, K> genericSerializer, GenericSerializer<Value, IO, V> genericSerializer2) {
        return ((IO) genericSerializer.serialize(str, Headers$.MODULE$.empty(), k)).flatMap(bArr -> {
            return ((IO) genericSerializer2.serialize(str, Headers$.MODULE$.empty(), v)).flatMap(bArr -> {
                return addRecord(str, bArr, Some$.MODULE$.apply(bArr), option).map(boxedUnit -> {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                });
            });
        });
    }

    private IO<BoxedUnit> waitForConsumerToBeAssignedTo(String str) {
        return IO$.MODULE$.apply(this::waitForConsumerToBeAssignedTo$$anonfun$1).flatMap(set -> {
            return set.contains(str) ? IO$.MODULE$.unit() : IO$.MODULE$.sleep(new package.DurationInt(package$.MODULE$.DurationInt(100)).millis()).$greater$greater(() -> {
                return r1.waitForConsumerToBeAssignedTo$$anonfun$2$$anonfun$1(r2);
            });
        });
    }

    private IO<BoxedUnit> addRecord(String str, byte[] bArr, Option<byte[]> option, Option<Instant> option2) {
        return waitForConsumerToBeAssignedTo(str).$greater$greater(() -> {
            return r1.addRecord$$anonfun$1(r2, r3, r4, r5);
        });
    }

    @Override // io.github.jchapuis.fs2.kafka.mock.MockKafkaConsumer
    public <K> IO<BoxedUnit> redact(String str, K k, GenericSerializer<Key, IO, K> genericSerializer) {
        return ((IO) genericSerializer.serialize(str, Headers$.MODULE$.empty(), k)).flatMap(bArr -> {
            return addRecord(str, bArr, None$.MODULE$, None$.MODULE$).map(boxedUnit -> {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            });
        });
    }

    public <T> T io$github$jchapuis$fs2$kafka$mock$impl$NativeMockKafkaConsumer$$withMutex(Function0<T> function0) {
        return (T) ((IOPlatform) this.mutex.lock().surround(IO$.MODULE$.apply(function0), IO$.MODULE$.asyncForIO())).unsafeRunSync(this.IORuntime);
    }

    /* JADX WARN: Unreachable blocks removed: 4, instructions: 4 */
    @Override // io.github.jchapuis.fs2.kafka.mock.MockKafkaConsumer
    public MkConsumer<IO> mkConsumer() {
        while (true) {
            long j = LazyVals$.MODULE$.get(this, OFFSET$0);
            long STATE = LazyVals$.MODULE$.STATE(j, 0);
            if (STATE == 3) {
                return this.mkConsumer$lzy1;
            }
            if (STATE != 0) {
                LazyVals$.MODULE$.wait4Notification(this, OFFSET$0, j, 0);
            } else if (LazyVals$.MODULE$.CAS(this, OFFSET$0, j, 1, 0)) {
                try {
                    NativeMockKafkaConsumer$$anon$1 nativeMockKafkaConsumer$$anon$1 = new NativeMockKafkaConsumer$$anon$1(this);
                    this.mkConsumer$lzy1 = nativeMockKafkaConsumer$$anon$1;
                    LazyVals$.MODULE$.setFlag(this, OFFSET$0, 3, 0);
                    return nativeMockKafkaConsumer$$anon$1;
                } catch (Throwable th) {
                    LazyVals$.MODULE$.setFlag(this, OFFSET$0, 0, 0);
                    throw th;
                }
            }
        }
    }

    private final Set waitForConsumerToBeAssignedTo$$anonfun$1() {
        return ((IterableOnceOps) CollectionConverters$.MODULE$.SetHasAsScala(mockConsumer().assignment()).asScala().map(topicPartition -> {
            return topicPartition.topic();
        })).toSet();
    }

    private final IO waitForConsumerToBeAssignedTo$$anonfun$2$$anonfun$1(String str) {
        return waitForConsumerToBeAssignedTo(str);
    }

    private static final IO addRecord$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$2() {
        return IO$.MODULE$.realTimeInstant();
    }

    private static final TimestampType $anonfun$2() {
        return TimestampType.LOG_APPEND_TIME;
    }

    private static final int $anonfun$4() {
        return 0;
    }

    private final /* synthetic */ Tuple2 addRecord$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$4(String str, byte[] bArr, Option option, Option option2, long j, long j2) {
        return Tuple2$.MODULE$.apply(BoxesRunTime.boxToLong(j2), new ConsumerRecord(str, this.io$github$jchapuis$fs2$kafka$mock$impl$NativeMockKafkaConsumer$$singlePartition, j, j2, (TimestampType) option2.map(instant -> {
            return TimestampType.CREATE_TIME;
        }).getOrElse(NativeMockKafkaConsumer::$anonfun$2), bArr.length, BoxesRunTime.unboxToInt(option.map(bArr2 -> {
            return bArr2.length;
        }).getOrElse(NativeMockKafkaConsumer::$anonfun$4)), bArr, option.orNull($less$colon$less$.MODULE$.refl()), new RecordHeaders(), Optional.empty()));
    }

    private final void addRecord$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$5$$anonfun$1(ConsumerRecord consumerRecord) {
        mockConsumer().addRecord(consumerRecord);
    }

    private final /* synthetic */ IO addRecord$$anonfun$1$$anonfun$1$$anonfun$1(String str, byte[] bArr, Option option, Option option2, long j) {
        return ((IO) option2.map(instant -> {
            return IO$.MODULE$.pure(instant);
        }).getOrElse(NativeMockKafkaConsumer::addRecord$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$2)).map(instant2 -> {
            return instant2.toEpochMilli();
        }).map(obj -> {
            return addRecord$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$4(str, bArr, option, option2, j, BoxesRunTime.unboxToLong(obj));
        }).flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            BoxesRunTime.unboxToLong(tuple2._1());
            ConsumerRecord consumerRecord = (ConsumerRecord) tuple2._2();
            return IO$.MODULE$.apply(() -> {
                addRecord$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$5$$anonfun$1(consumerRecord);
                return BoxedUnit.UNIT;
            }).map(boxedUnit -> {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            });
        });
    }

    private final IO addRecord$$anonfun$1(String str, byte[] bArr, Option option, Option option2) {
        return (IO) this.mutex.lock().surround(IO$.MODULE$.uncancelable(poll -> {
            return incrementOffset(str).flatMap(obj -> {
                return addRecord$$anonfun$1$$anonfun$1$$anonfun$1(str, bArr, option, option2, BoxesRunTime.unboxToLong(obj));
            });
        }), IO$.MODULE$.asyncForIO());
    }

    public static final /* synthetic */ Tuple2 io$github$jchapuis$fs2$kafka$mock$impl$NativeMockKafkaConsumer$$anon$2$$_$offsetsForTimes$$anonfun$1$$anonfun$1(java.util.Map map, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        TopicPartition topicPartition = (TopicPartition) tuple2._1();
        Long l = (Long) tuple2._2();
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetAndTimestamp(Predef$.MODULE$.Long2long(l), Predef$.MODULE$.Long2long((Long) map.get(topicPartition))));
    }
}
