package fs2.kafka.internal;

import cats.effect.Blocker;
import cats.effect.Concurrent;
import cats.effect.ContextShift;
import cats.effect.Resource;
import cats.effect.Resource$;
import cats.effect.ResourceLike;
import cats.effect.concurrent.Semaphore;
import cats.effect.concurrent.Semaphore$;
import cats.implicits$;
import fs2.kafka.ConsumerSettings;
import java.io.Serializable;
import org.apache.kafka.clients.consumer.Consumer;
import scala.Function2;
import scala.Tuple2$;
import scala.concurrent.ExecutionContext;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: WithConsumer.scala */
/* loaded from: input_file:fs2/kafka/internal/WithConsumer$.class */
public final class WithConsumer$ implements Serializable {
    public static final WithConsumer$ MODULE$ = new WithConsumer$();

    private WithConsumer$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(WithConsumer$.class);
    }

    public <F, K, V> Resource<F, WithConsumer<F>> apply(ConsumerSettings<F, K, V> consumerSettings, Concurrent<F> concurrent, ContextShift<F> contextShift) {
        return ((ResourceLike) consumerSettings.blocker().map((v2) -> {
            return $anonfun$adapted$1(r2, v2);
        }).getOrElse(() -> {
            return r1.$anonfun$1(r2);
        })).map((v3) -> {
            return $anonfun$adapted$2(r2, r3, v3);
        }, concurrent).flatMap(blocking -> {
            return Resource$.MODULE$.make(implicits$.MODULE$.catsSyntaxTuple2Semigroupal(Tuple2$.MODULE$.apply(consumerSettings.createConsumer(), Semaphore$.MODULE$.apply(1L, concurrent))).mapN((consumer, semaphore) -> {
                return new WithConsumer<F>(blocking, consumer, semaphore) { // from class: fs2.kafka.internal.WithConsumer$$anon$1
                    private final Blocking blocking_$1;
                    private final Consumer consumer$1;
                    private final Semaphore semaphore$1;

                    {
                        this.blocking_$1 = blocking;
                        this.consumer$1 = consumer;
                        this.semaphore$1 = semaphore;
                    }

                    @Override // fs2.kafka.internal.WithConsumer
                    public Object apply(Function2 function2) {
                        return this.semaphore$1.withPermit(function2.apply(this.consumer$1, this.blocking_$1));
                    }
                };
            }, concurrent, concurrent), withConsumer -> {
                return withConsumer.blocking(consumer2 -> {
                    consumer2.close(syntax$FiniteDurationSyntax$.MODULE$.asJava$extension(syntax$.MODULE$.FiniteDurationSyntax(consumerSettings.closeTimeout())));
                });
            }, concurrent);
        });
    }

    private final /* synthetic */ Resource $anonfun$2(Concurrent concurrent, ExecutionContext executionContext) {
        return Resource$.MODULE$.pure(new Blocker(executionContext), concurrent);
    }

    private final Resource $anonfun$adapted$1(Concurrent concurrent, Object obj) {
        return $anonfun$2(concurrent, obj == null ? null : ((Blocker) obj).blockingContext());
    }

    private final Resource $anonfun$1(Concurrent concurrent) {
        return Blockers$.MODULE$.consumer(concurrent);
    }

    private final /* synthetic */ Blocking $anonfun$3(Concurrent concurrent, ContextShift contextShift, ExecutionContext executionContext) {
        return Blocking$.MODULE$.fromBlocker(executionContext, concurrent, contextShift);
    }

    private final Blocking $anonfun$adapted$2(Concurrent concurrent, ContextShift contextShift, Object obj) {
        return $anonfun$3(concurrent, contextShift, obj == null ? null : ((Blocker) obj).blockingContext());
    }
}
