package fs2.kafka;

import cats.Apply;
import cats.effect.Concurrent;
import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.IO$;
import cats.effect.Resource;
import cats.effect.concurrent.Deferred;
import cats.effect.concurrent.Deferred$;
import cats.implicits$;
import cats.syntax.MonadErrorRethrowOps$;
import fs2.Chunk;
import fs2.Chunk$;
import fs2.Stream;
import fs2.Stream$;
import fs2.internal.FreeC;
import fs2.kafka.KafkaProducer;
import fs2.kafka.internal.WithProducer;
import fs2.kafka.internal.converters$;
import java.io.Serializable;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2$;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: KafkaProducer.scala */
/* loaded from: input_file:fs2/kafka/KafkaProducer$.class */
public final class KafkaProducer$ implements Serializable {
    public static final KafkaProducer$ MODULE$ = new KafkaProducer$();

    private KafkaProducer$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaProducer$.class);
    }

    public <F, K, V> Resource<F, KafkaProducer.Metrics<F, K, V>> resource(ProducerSettings<F, K, V> producerSettings, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return KafkaProducerConnection$.MODULE$.resource(producerSettings, concurrentEffect, contextShift).evalMap(kafkaProducerConnection -> {
            return kafkaProducerConnection.withSerializersFrom(producerSettings);
        }, concurrentEffect);
    }

    public <F, K, V> KafkaProducer.Metrics<F, K, V> from(final WithProducer<F> withProducer, final Serializer<F, K> serializer, final Serializer<F, V> serializer2, final ConcurrentEffect<F> concurrentEffect) {
        return new KafkaProducer.Metrics<F, K, V>(withProducer, serializer, serializer2, concurrentEffect) { // from class: fs2.kafka.KafkaProducer$$anon$1
            private final WithProducer withProducer$1;
            private final Serializer keySerializer$1;
            private final Serializer valueSerializer$1;
            private final ConcurrentEffect evidence$1$1;

            {
                this.withProducer$1 = withProducer;
                this.keySerializer$1 = serializer;
                this.valueSerializer$1 = serializer2;
                this.evidence$1$1 = concurrentEffect;
            }

            @Override // fs2.kafka.KafkaProducer
            public Object produce(ProducerRecords producerRecords) {
                return this.withProducer$1.apply((producer, blocking) -> {
                    return implicits$.MODULE$.toFunctorOps(implicits$.MODULE$.toTraverseOps(producerRecords.records(), Chunk$.MODULE$.instance()).traverse(KafkaProducer$.MODULE$.produceRecord(this.keySerializer$1, this.valueSerializer$1, producer, this.evidence$1$1), this.evidence$1$1), this.evidence$1$1).map(chunk -> {
                        return implicits$.MODULE$.toFunctorOps(implicits$.MODULE$.toTraverseOps(chunk, Chunk$.MODULE$.instance()).sequence($less$colon$less$.MODULE$.refl(), this.evidence$1$1), this.evidence$1$1).map((v1) -> {
                            return KafkaProducer$.fs2$kafka$KafkaProducer$$anon$1$$_$produce$$anonfun$1$$anonfun$1$$anonfun$1(r1, v1);
                        });
                    });
                });
            }

            @Override // fs2.kafka.KafkaProducer.Metrics
            public Object metrics() {
                return this.withProducer$1.blocking(KafkaProducer$::fs2$kafka$KafkaProducer$$anon$1$$_$metrics$$anonfun$1);
            }

            public String toString() {
                return "KafkaProducer$" + System.identityHashCode(this);
            }
        };
    }

    public <F> ConcurrentEffect resource(ConcurrentEffect<F> concurrentEffect) {
        return concurrentEffect;
    }

    public <F, K, V> FreeC stream(ProducerSettings<F, K, V> producerSettings, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return Stream$.MODULE$.resource(resource(producerSettings, concurrentEffect, contextShift));
    }

    public <F> ConcurrentEffect stream(ConcurrentEffect<F> concurrentEffect) {
        return concurrentEffect;
    }

    public <F, K, V> Function1<ProducerRecord<K, V>, Object> produceRecord(Serializer<F, K> serializer, Serializer<F, V> serializer2, Producer<byte[], byte[]> producer, ConcurrentEffect<F> concurrentEffect) {
        return producerRecord -> {
            return implicits$.MODULE$.toFlatMapOps(asJavaRecord(serializer, serializer2, producerRecord, concurrentEffect), concurrentEffect).flatMap(producerRecord -> {
                return implicits$.MODULE$.toFlatMapOps(Deferred$.MODULE$.apply(concurrentEffect), concurrentEffect).flatMap(deferred -> {
                    return implicits$.MODULE$.toFunctorOps(concurrentEffect.delay(() -> {
                        return r2.produceRecord$$anonfun$4$$anonfun$3$$anonfun$3$$anonfun$3(r3, r4, r5, r6, r7);
                    }), concurrentEffect).as(MonadErrorRethrowOps$.MODULE$.rethrow$extension(implicits$.MODULE$.catsSyntaxMonadErrorRethrow(deferred.get(), concurrentEffect), concurrentEffect));
                });
            });
        };
    }

    public <F, K, V, P> Function1<FreeC, FreeC> pipe(ProducerSettings<F, K, V> producerSettings, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return (v4) -> {
            return pipe$$anonfun$adapted$1(r1, r2, r3, v4);
        };
    }

    public <F, K, V, P> Function1<FreeC, FreeC> pipe(ProducerSettings<F, K, V> producerSettings, KafkaProducer<F, K, V> kafkaProducer, Concurrent<F> concurrent) {
        return (v4) -> {
            return pipe$$anonfun$adapted$2(r1, r2, r3, v4);
        };
    }

    private <F, K, V> Object serializeToBytes(Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecord<K, V> producerRecord, Apply<F> apply) {
        F serialize = serializer.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.key());
        return implicits$.MODULE$.catsSyntaxSemigroupal(serialize, apply).product(serializer2.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.value()));
    }

    private <F, K, V> Object asJavaRecord(Serializer<F, K> serializer, Serializer<F, V> serializer2, ProducerRecord<K, V> producerRecord, Apply<F> apply) {
        return implicits$.MODULE$.toFunctorOps(serializeToBytes(serializer, serializer2, producerRecord, apply), apply).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new org.apache.kafka.clients.producer.ProducerRecord(producerRecord.topic(), (Integer) producerRecord.partition().fold(this::asJavaRecord$$anonfun$2$$anonfun$1, this::asJavaRecord$$anonfun$4$$anonfun$adapted$1), (Long) producerRecord.timestamp().fold(this::asJavaRecord$$anonfun$5$$anonfun$3, this::asJavaRecord$$anonfun$7$$anonfun$adapted$2), (byte[]) tuple2._1(), (byte[]) tuple2._2(), producerRecord.headers().asJava());
        });
    }

    public static final /* synthetic */ ProducerResult fs2$kafka$KafkaProducer$$anon$1$$_$produce$$anonfun$1$$anonfun$1$$anonfun$1(ProducerRecords producerRecords, Chunk chunk) {
        return ProducerResult$.MODULE$.apply(chunk, producerRecords.passthrough());
    }

    public static final /* synthetic */ Map fs2$kafka$KafkaProducer$$anon$1$$_$metrics$$anonfun$1(Producer producer) {
        return converters$.MODULE$.collection().MapHasAsScala(producer.metrics()).asScala().toMap($less$colon$less$.MODULE$.refl());
    }

    private final Future produceRecord$$anonfun$4$$anonfun$3$$anonfun$3$$anonfun$3(Producer producer, ConcurrentEffect concurrentEffect, ProducerRecord producerRecord, org.apache.kafka.clients.producer.ProducerRecord producerRecord2, Deferred deferred) {
        return producer.send(producerRecord2, (recordMetadata, exc) -> {
            concurrentEffect.runAsync(deferred.complete(exc == null ? scala.package$.MODULE$.Right().apply(Tuple2$.MODULE$.apply(producerRecord, recordMetadata)) : scala.package$.MODULE$.Left().apply(exc)), either -> {
                return IO$.MODULE$.unit();
            }).unsafeRunSync();
        });
    }

    private final /* synthetic */ FreeC pipe$$anonfun$1$$anonfun$1(ProducerSettings producerSettings, ConcurrentEffect concurrentEffect, FreeC freeC, KafkaProducer.Metrics metrics) {
        Object apply = pipe(producerSettings, (KafkaProducer) metrics, (Concurrent) concurrentEffect).apply(new Stream(freeC));
        if (apply == null) {
            return null;
        }
        return ((Stream) apply).fs2$Stream$$free();
    }

    private final Object pipe$$anonfun$2$$anonfun$adapted$1(ProducerSettings producerSettings, ConcurrentEffect concurrentEffect, FreeC freeC, KafkaProducer.Metrics metrics) {
        return new Stream(pipe$$anonfun$1$$anonfun$1(producerSettings, concurrentEffect, freeC, metrics));
    }

    private final /* synthetic */ FreeC pipe$$anonfun$3(ProducerSettings producerSettings, ConcurrentEffect concurrentEffect, ContextShift contextShift, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(stream(producerSettings, concurrentEffect, contextShift), (v4) -> {
            return pipe$$anonfun$2$$anonfun$adapted$1(r3, r4, r5, v4);
        });
    }

    private final Object pipe$$anonfun$adapted$1(ProducerSettings producerSettings, ConcurrentEffect concurrentEffect, ContextShift contextShift, Object obj) {
        return new Stream(pipe$$anonfun$3(producerSettings, concurrentEffect, contextShift, obj == null ? null : ((Stream) obj).fs2$Stream$$free()));
    }

    private final /* synthetic */ FreeC pipe$$anonfun$6(ProducerSettings producerSettings, KafkaProducer kafkaProducer, Concurrent concurrent, FreeC freeC) {
        return Stream$.MODULE$.mapAsync$extension(Stream$.MODULE$.evalMap$extension(freeC, producerRecords -> {
            return kafkaProducer.produce(producerRecords);
        }), producerSettings.parallelism(), obj -> {
            return Predef$.MODULE$.identity(obj);
        }, concurrent);
    }

    private final Object pipe$$anonfun$adapted$2(ProducerSettings producerSettings, KafkaProducer kafkaProducer, Concurrent concurrent, Object obj) {
        return new Stream(pipe$$anonfun$6(producerSettings, kafkaProducer, concurrent, obj == null ? null : ((Stream) obj).fs2$Stream$$free()));
    }

    private final Integer asJavaRecord$$anonfun$2$$anonfun$1() {
        return null;
    }

    private final /* synthetic */ Integer asJavaRecord$$anonfun$3$$anonfun$2(int i) {
        return (Integer) Predef$.MODULE$.identity(Predef$.MODULE$.int2Integer(i));
    }

    private final Integer asJavaRecord$$anonfun$4$$anonfun$adapted$1(Object obj) {
        return asJavaRecord$$anonfun$3$$anonfun$2(BoxesRunTime.unboxToInt(obj));
    }

    private final Long asJavaRecord$$anonfun$5$$anonfun$3() {
        return null;
    }

    private final /* synthetic */ Long asJavaRecord$$anonfun$6$$anonfun$4(long j) {
        return (Long) Predef$.MODULE$.identity(Predef$.MODULE$.long2Long(j));
    }

    private final Long asJavaRecord$$anonfun$7$$anonfun$adapted$2(Object obj) {
        return asJavaRecord$$anonfun$6$$anonfun$4(BoxesRunTime.unboxToLong(obj));
    }
}
