package io.superflat.lagompb.readside;

import akka.Done;
import akka.Done$;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.scaladsl.adapter.package$TypedActorSystemOps$;
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings$;
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess$;
import akka.kafka.ProducerSettings$;
import akka.kafka.scaladsl.SendProducer;
import akka.kafka.scaladsl.SendProducer$;
import akka.projection.ProjectionBehavior;
import akka.projection.ProjectionBehavior$;
import akka.projection.ProjectionBehavior$Stop$;
import akka.projection.ProjectionId$;
import akka.projection.eventsourced.scaladsl.EventSourcedProvider$;
import akka.projection.slick.SlickProjection$;
import com.google.protobuf.any.Any;
import io.superflat.lagompb.ConfigReader$;
import io.superflat.lagompb.GlobalException;
import io.superflat.lagompb.ProtosRegistry$;
import io.superflat.lagompb.encryption.ProtoEncryption;
import io.superflat.lagompb.protobuf.core.KafkaEvent$;
import io.superflat.lagompb.protobuf.core.MetaData;
import io.superflat.lagompb.protobuf.core.StateWrapper;
import io.superflat.lagompb.protobuf.core.StateWrapper$;
import io.superflat.lagompb.protobuf.extensions.ExtensionsProto$;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.concurrent.ExecutionContext;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scalapb.GeneratedMessage;
import scalapb.GeneratedMessageCompanion;
import scalapb.descriptors.FieldDescriptor;
import scalapb.descriptors.Reads$;
import slick.basic.DatabaseConfig;
import slick.basic.DatabaseConfig$;
import slick.dbio.DBIOAction;
import slick.dbio.DBIOAction$;
import slick.dbio.Effect;
import slick.dbio.NoStream;
import slick.dbio.package$;
import slick.jdbc.PostgresProfile;

/* compiled from: KafkaPublisher.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Mf!\u0002\n\u0014\u0003\u0003a\u0002\u0002\u0003\u0015\u0001\u0005\u0003\u0005\u000b\u0011B\u0015\t\u00119\u0002!\u0011!Q\u0001\f=B\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006YA\u000e\u0005\u0006\u0019\u0002!\t!\u0014\u0005\bC\u0002\u0011\r\u0011\"\u0002c\u0011\u0019Y\u0007\u0001)A\u0007G\"9A\u000e\u0001b\u0001\n\u0003i\u0007BB9\u0001A\u0003%a\u000eC\u0004s\u0001\t\u0007I\u0011C:\t\u000f\u0005\u0015\u0001\u0001)A\u0005i\"I\u0011q\u0001\u0001C\u0002\u0013E\u0011\u0011\u0002\u0005\t\u0003C\u0001\u0001\u0015!\u0003\u0002\f!A\u00111\u0005\u0001!\u0002\u0013\t)\u0003C\u0004\u00026\u0001!)%a\u000e\t\u000f\u0005\u0005\u0006\u0001\"\u0001\u0002$\"9\u00111\u0016\u0001\u0007\u0002\u0005%\u0001bBAW\u0001\u0019\u0005\u0011q\u0016\u0002\u000f\u0017\u000647.\u0019)vE2L7\u000f[3s\u0015\t!R#\u0001\u0005sK\u0006$7/\u001b3f\u0015\t1r#A\u0004mC\u001e|W\u000e\u001d2\u000b\u0005aI\u0012!C:va\u0016\u0014h\r\\1u\u0015\u0005Q\u0012AA5p\u0007\u0001)\"!H)\u0014\u0007\u0001qB\u0005\u0005\u0002 E5\t\u0001EC\u0001\"\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0003E\u0001\u0004B]f\u0014VM\u001a\t\u0003K\u0019j\u0011aE\u0005\u0003OM\u0011a\"\u0012<f]R\u0004&o\\2fgN|'/\u0001\u0006f]\u000e\u0014\u0018\u0010\u001d;j_:\u0004\"A\u000b\u0017\u000e\u0003-R!\u0001K\u000b\n\u00055Z#a\u0004)s_R|WI\\2ssB$\u0018n\u001c8\u0002\u0005\u0015\u001c\u0007C\u0001\u00194\u001b\u0005\t$B\u0001\u001a!\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003iE\u0012\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0002\u0017\u0005\u001cGo\u001c:TsN$X-\u001c\u0019\u0003o\r\u00032\u0001O B\u001b\u0005I$B\u0001\u001e<\u0003\u0015!\u0018\u0010]3e\u0015\taT(A\u0003bGR|'OC\u0001?\u0003\u0011\t7n[1\n\u0005\u0001K$aC!di>\u00148+_:uK6\u0004\"AQ\"\r\u0001\u0011IAiAA\u0001\u0002\u0003\u0015\t!\u0012\u0002\u0004?\u0012\n\u0014C\u0001$J!\tyr)\u0003\u0002IA\t9aj\u001c;iS:<\u0007CA\u0010K\u0013\tY\u0005EA\u0002B]f\fa\u0001P5oSRtDC\u0001(a)\ry%l\u0017\t\u0004K\u0001\u0001\u0006C\u0001\"R\t\u0015\u0011\u0006A1\u0001T\u0005\u0005\u0019\u0016C\u0001$U!\t)\u0006,D\u0001W\u0015\u00059\u0016aB:dC2\f\u0007OY\u0005\u00033Z\u0013\u0001cR3oKJ\fG/\u001a3NKN\u001c\u0018mZ3\t\u000b9\"\u00019A\u0018\t\u000bU\"\u00019\u0001/1\u0005u{\u0006c\u0001\u001d@=B\u0011!i\u0018\u0003\n\tn\u000b\t\u0011!A\u0003\u0002\u0015CQ\u0001\u000b\u0003A\u0002%\n1\u0001\\8h+\u0005\u0019\u0007C\u00013j\u001b\u0005)'B\u00014h\u0003\u0015\u0019HN\u001a\u001bk\u0015\u0005A\u0017aA8sO&\u0011!.\u001a\u0002\u0007\u0019><w-\u001a:\u0002\t1|w\rI\u0001\u000faJ|G-^2fe\u000e{gNZ5h+\u0005q\u0007CA\u0013p\u0013\t\u00018CA\u0006LC\u001a\\\u0017mQ8oM&<\u0017a\u00049s_\u0012,8-\u001a:D_:4\u0017n\u001a\u0011\u0002\u0011\u0011\u00147i\u001c8gS\u001e,\u0012\u0001\u001e\t\u0004kjdX\"\u0001<\u000b\u0005]D\u0018!\u00022bg&\u001c'\"A=\u0002\u000bMd\u0017nY6\n\u0005m4(A\u0004#bi\u0006\u0014\u0017m]3D_:4\u0017n\u001a\t\u0004{\u0006\u0005Q\"\u0001@\u000b\u0005}D\u0018\u0001\u00026eE\u000eL1!a\u0001\u007f\u0005=\u0001vn\u001d;he\u0016\u001c\bK]8gS2,\u0017!\u00033c\u0007>tg-[4!\u0003\u001d\u0011\u0017m]3UC\u001e,\"!a\u0003\u0011\t\u00055\u00111\u0004\b\u0005\u0003\u001f\t9\u0002E\u0002\u0002\u0012\u0001j!!a\u0005\u000b\u0007\u0005U1$\u0001\u0004=e>|GOP\u0005\u0004\u00033\u0001\u0013A\u0002)sK\u0012,g-\u0003\u0003\u0002\u001e\u0005}!AB*ue&twMC\u0002\u0002\u001a\u0001\n\u0001BY1tKR\u000bw\rI\u0001\rg\u0016tG\r\u0015:pIV\u001cWM\u001d\t\t\u0003O\t\t$a\u0003\u0002\f5\u0011\u0011\u0011\u0006\u0006\u0005\u0003W\ti#\u0001\u0005tG\u0006d\u0017\rZ:m\u0015\r\ty#P\u0001\u0006W\u000647.Y\u0005\u0005\u0003g\tIC\u0001\u0007TK:$\u0007K]8ek\u000e,'/A\u0004qe>\u001cWm]:\u0015\u0019\u0005e\u00121LA7\u0003\u000f\u000bY)a$\u0011\r\u0005m\u0012QJA*\u001d\u0011\ti$a\u0012\u000f\t\u0005}\u00121\t\b\u0005\u0003#\t\t%C\u0001z\u0013\r\t)\u0005_\u0001\u0005I\nLw.\u0003\u0003\u0002J\u0005-\u0013a\u00029bG.\fw-\u001a\u0006\u0004\u0003\u000bB\u0018\u0002BA(\u0003#\u0012A\u0001\u0012\"J\u001f*!\u0011\u0011JA&!\u0011\t)&a\u0016\u000e\u0003uJ1!!\u0017>\u0005\u0011!uN\\3\t\u000f\u0005uc\u00021\u0001\u0002`\u0005!1m\\7qa\u0011\t\t'!\u001b\u0011\u000bU\u000b\u0019'a\u001a\n\u0007\u0005\u0015dKA\rHK:,'/\u0019;fI6+7o]1hK\u000e{W\u000e]1oS>t\u0007c\u0001\"\u0002j\u0011Y\u00111NA.\u0003\u0003\u0005\tQ!\u0001T\u0005\ryFE\r\u0005\b\u0003_r\u0001\u0019AA9\u0003\u0015)g/\u001a8u!\u0011\t\u0019(!\"\u000e\u0005\u0005U$\u0002BA<\u0003s\n1!\u00198z\u0015\u0011\tY(! \u0002\u0011A\u0014x\u000e^8ck\u001aTA!a \u0002\u0002\u00061qm\\8hY\u0016T!!a!\u0002\u0007\r|W.C\u0002L\u0003kBq!!#\u000f\u0001\u0004\tY!\u0001\u0005fm\u0016tG\u000fV1h\u0011\u001d\tiI\u0004a\u0001\u0003c\naB]3tk2$\u0018N\\4Ti\u0006$X\rC\u0004\u0002\u0012:\u0001\r!a%\u0002\t5,G/\u0019\t\u0005\u0003+\u000bi*\u0004\u0002\u0002\u0018*!\u0011\u0011TAN\u0003\u0011\u0019wN]3\u000b\u0007\u0005mT#\u0003\u0003\u0002 \u0006]%\u0001C'fi\u0006$\u0015\r^1\u0002\t%t\u0017\u000e\u001e\u000b\u0003\u0003K\u00032aHAT\u0013\r\tI\u000b\t\u0002\u0005+:LG/\u0001\bqe>TWm\u0019;j_:t\u0015-\\3\u0002/\u0005<wM]3hCR,7\u000b^1uK\u000e{W\u000e]1oS>tWCAAY!\u0011)\u00161\r)")
/* loaded from: input_file:io/superflat/lagompb/readside/KafkaPublisher.class */
public abstract class KafkaPublisher<S extends GeneratedMessage> implements EventProcessor {
    private final ProtoEncryption encryption;
    private final ExecutionContext ec;
    private final ActorSystem<?> actorSystem;
    private final KafkaConfig producerConfig;
    private final DatabaseConfig<PostgresProfile> dbConfig;
    private final SendProducer<String, String> sendProducer;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final String baseTag = ConfigReader$.MODULE$.eventsConfig().tagName();

    public final Logger log() {
        return this.log;
    }

    public KafkaConfig producerConfig() {
        return this.producerConfig;
    }

    public DatabaseConfig<PostgresProfile> dbConfig() {
        return this.dbConfig;
    }

    public String baseTag() {
        return this.baseTag;
    }

    @Override // io.superflat.lagompb.readside.EventProcessor
    public final DBIOAction<Done, NoStream, Effect.All> process(GeneratedMessageCompanion<? extends GeneratedMessage> generatedMessageCompanion, Any any, String str, Any any2, MetaData metaData) {
        DBIOAction<Done, NoStream, Effect.All> failed;
        FieldDescriptor fieldDescriptor;
        Some find = generatedMessageCompanion.scalaDescriptor().fields().find(fieldDescriptor2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$process$1(fieldDescriptor2));
        });
        if ((find instanceof Some) && (fieldDescriptor = (FieldDescriptor) find.value()) != null) {
            failed = package$.MODULE$.DBIO().from(this.sendProducer.send(new ProducerRecord(producerConfig().topic(), generatedMessageCompanion.parseFrom(any.value().toByteArray()).getField(fieldDescriptor).as(Reads$.MODULE$.stringReads()), ProtosRegistry$.MODULE$.printer().print(KafkaEvent$.MODULE$.defaultInstance().withEvent(any).withState(new StateWrapper(StateWrapper$.MODULE$.apply$default$1(), StateWrapper$.MODULE$.apply$default$2()).withMeta(metaData).withState(any2)).withPartitionKey((String) generatedMessageCompanion.parseFrom(any.value().toByteArray()).getField(fieldDescriptor).as(Reads$.MODULE$.stringReads())).withServiceName(ConfigReader$.MODULE$.serviceName())))).map(recordMetadata -> {
                this.log().info("Published event [{}] and state [{}] to topic/partition {}/{}", new Object[]{any.typeUrl(), any2.typeUrl(), this.producerConfig().topic(), BoxesRunTime.boxToInteger(recordMetadata.partition())});
                return Done$.MODULE$;
            }, this.ec));
        } else {
            if (!None$.MODULE$.equals(find)) {
                throw new MatchError(find);
            }
            failed = DBIOAction$.MODULE$.failed(new GlobalException(new StringBuilder(44).append("No partition key field is defined for event ").append(any.typeUrl()).toString()));
        }
        return failed;
    }

    public void init() {
        ShardedDaemonProcess$.MODULE$.apply(this.actorSystem).init(projectionName(), ConfigReader$.MODULE$.allEventTags().size(), obj -> {
            return $anonfun$init$1(this, BoxesRunTime.unboxToInt(obj));
        }, ShardedDaemonProcessSettings$.MODULE$.apply(this.actorSystem), new Some(ProjectionBehavior$Stop$.MODULE$), ClassTag$.MODULE$.apply(ProjectionBehavior.Command.class));
    }

    public abstract String projectionName();

    public abstract GeneratedMessageCompanion<S> aggregateStateCompanion();

    public static final /* synthetic */ boolean $anonfun$process$1(FieldDescriptor fieldDescriptor) {
        return ((Option) fieldDescriptor.getOptions().extension(ExtensionsProto$.MODULE$.kafka())).exists(kafkaRule -> {
            return BoxesRunTime.boxToBoolean(kafkaRule.partitionKey());
        });
    }

    public static final /* synthetic */ Behavior $anonfun$init$1(KafkaPublisher kafkaPublisher, int i) {
        String sb = new StringBuilder(0).append(kafkaPublisher.baseTag()).append(i).toString();
        return ProjectionBehavior$.MODULE$.apply(SlickProjection$.MODULE$.exactlyOnce(ProjectionId$.MODULE$.apply(kafkaPublisher.projectionName(), sb), EventSourcedProvider$.MODULE$.eventsByTag(kafkaPublisher.actorSystem, "jdbc-read-journal", sb), kafkaPublisher.dbConfig(), () -> {
            return new EventsReader(sb, kafkaPublisher.encryption, kafkaPublisher);
        }, ClassTag$.MODULE$.apply(PostgresProfile.class), kafkaPublisher.actorSystem));
    }

    public KafkaPublisher(ProtoEncryption protoEncryption, ExecutionContext executionContext, ActorSystem<?> actorSystem) {
        this.encryption = protoEncryption;
        this.ec = executionContext;
        this.actorSystem = actorSystem;
        this.producerConfig = KafkaConfig$.MODULE$.apply(actorSystem.settings().config().getConfig(" lagompb.projection.kafka"));
        this.dbConfig = DatabaseConfig$.MODULE$.forConfig("akka.projection.slick", actorSystem.settings().config(), DatabaseConfig$.MODULE$.forConfig$default$3(), ClassTag$.MODULE$.apply(PostgresProfile.class));
        this.sendProducer = SendProducer$.MODULE$.apply(ProducerSettings$.MODULE$.apply(actorSystem, new StringSerializer(), new StringSerializer()).withBootstrapServers(producerConfig().bootstrapServers()), package$TypedActorSystemOps$.MODULE$.toClassic$extension(akka.actor.typed.scaladsl.adapter.package$.MODULE$.TypedActorSystemOps(actorSystem)));
    }
}
