package pl.waw.ibspan.scala_mqtt_wrapper.pekko;

import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.stream.KillSwitches$;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.OverflowStrategy$;
import org.apache.pekko.stream.RestartSettings;
import org.apache.pekko.stream.RestartSettings$;
import org.apache.pekko.stream.UniqueKillSwitch;
import org.apache.pekko.stream.connectors.mqtt.streaming.Command;
import org.apache.pekko.stream.connectors.mqtt.streaming.Command$;
import org.apache.pekko.stream.connectors.mqtt.streaming.Connect$;
import org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags;
import org.apache.pekko.stream.connectors.mqtt.streaming.Event;
import org.apache.pekko.stream.connectors.mqtt.streaming.Event$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttSessionSettings;
import org.apache.pekko.stream.connectors.mqtt.streaming.PacketId;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.Publish;
import org.apache.pekko.stream.connectors.mqtt.streaming.Publish$;
import org.apache.pekko.stream.connectors.mqtt.streaming.Subscribe$;
import org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession;
import org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession$;
import org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.Mqtt$;
import org.apache.pekko.stream.scaladsl.BroadcastHub$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Keep$;
import org.apache.pekko.stream.scaladsl.MergeHub$;
import org.apache.pekko.stream.scaladsl.RestartSource$;
import org.apache.pekko.stream.scaladsl.Sink;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete;
import org.apache.pekko.stream.scaladsl.Tcp;
import org.apache.pekko.stream.scaladsl.Tcp$;
import org.apache.pekko.util.ByteString;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.Tuple3;
import scala.Tuple3$;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.List;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ScalaRunTime$;
import scala.util.Either;
import scala.util.Right;

/* compiled from: MqttClient.scala */
/* loaded from: input_file:pl/waw/ibspan/scala_mqtt_wrapper/pekko/MqttClient.class */
public class MqttClient {
    private final MqttSettings mqttSettings;
    private final MqttSessionSettings mqttSessionSettings;
    private final Option loggingSettings;
    private final ActorSystem<?> system;
    private final String name;
    private final ActorMqttClientSession session;
    private final Flow tcpConnection;
    private final Flow sessionFlow;
    private final Command connectCommand;
    private final List subscribeCommands;
    private final List initialCommands = subscribeCommands().$colon$colon(connectCommand());
    private final Sink commandMergeSink;
    private final UniqueKillSwitch commandMergeSinkKillSwitch;
    private final Source commandBroadcastSource;
    private final Source commandQueueSource;
    private final SourceQueueWithComplete commandQueue;
    private final RestartSettings restartingEventSourceSettings;
    private final Source restartingEventSource;
    private final UniqueKillSwitch eventBroadcastSourceKillSwitch;
    private final Source eventBroadcastSource;
    private final Option eventBroadcastConsumerFuture;
    private final Source publishEventBroadcastSource;
    private final Source publishMergeSinkSource;
    private final Source publishMergeSinkSourceWithOptionalLogger;
    private final Sink publishMergeSink;
    private final UniqueKillSwitch publishMergeSinkKillSwitch;
    private final Future publishMergeSinkFuture;

    public MqttClient(MqttSettings mqttSettings, MqttSessionSettings mqttSessionSettings, Option<MqttLoggingSettings> option, ActorSystem<?> actorSystem) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        this.mqttSettings = mqttSettings;
        this.mqttSessionSettings = mqttSessionSettings;
        this.loggingSettings = option;
        this.system = actorSystem;
        this.name = (String) option.fold(MqttClient::$init$$$anonfun$1, mqttLoggingSettings -> {
            return mqttLoggingSettings.name();
        });
        this.session = ActorMqttClientSession$.MODULE$.apply(mqttSessionSettings, actorSystem);
        this.tcpConnection = Tcp$.MODULE$.apply(actorSystem).outgoingConnection(mqttSettings.host(), mqttSettings.port());
        this.sessionFlow = Mqtt$.MODULE$.clientSessionFlow(session(), mqttSettings.sessionId()).join(tcpConnection());
        this.connectCommand = Command$.MODULE$.apply(Connect$.MODULE$.apply(mqttSettings.clientId(), mqttSettings.connectFlags(), mqttSettings.username(), mqttSettings.password()));
        this.subscribeCommands = ((IterableOnceOps) mqttSettings.subscriptions().map(mqttTopic -> {
            return Command$.MODULE$.apply(Subscribe$.MODULE$.apply(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Tuple2$.MODULE$.apply(mqttTopic.name(), new ControlPacketFlags(mqttTopic.flags()))}))));
        })).toList();
        Tuple2 tuple23 = (Tuple2) MergeHub$.MODULE$.source(mqttSettings.commandMergeSinkPerProducerBufferSize()).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.both()).toMat(BroadcastHub$.MODULE$.sink(mqttSettings.commandBroadcastSourceBufferSize()), Keep$.MODULE$.both()).run(Materializer$.MODULE$.matFromSystem(actorSystem));
        if (tuple23 == null || (tuple2 = (Tuple2) tuple23._1()) == null) {
            throw new MatchError(tuple23);
        }
        Tuple3 apply = Tuple3$.MODULE$.apply((Sink) tuple2._1(), (UniqueKillSwitch) tuple2._2(), (Source) tuple23._2());
        this.commandMergeSink = (Sink) apply._1();
        this.commandMergeSinkKillSwitch = (UniqueKillSwitch) apply._2();
        this.commandBroadcastSource = (Source) apply._3();
        this.commandQueueSource = Source$.MODULE$.queue(mqttSettings.commandQueueBufferSize(), OverflowStrategy$.MODULE$.backpressure());
        this.commandQueue = (SourceQueueWithComplete) ((Source) option.fold(this::$init$$$anonfun$4, mqttLoggingSettings2 -> {
            String sb = new StringBuilder(26).append(mqttLoggingSettings2.name()).append(" : (internal) commandQueue").toString();
            Function1 function1 = command -> {
                return new StringBuilder(8).append("event [").append(command).append("]").toString();
            };
            return commandQueueSource().log(sb, function1, commandQueueSource().log$default$3(sb, function1)).addAttributes(mqttLoggingSettings2.attributes());
        })).to(commandMergeSink()).run(Materializer$.MODULE$.matFromSystem(actorSystem));
        this.restartingEventSourceSettings = RestartSettings$.MODULE$.apply(mqttSettings.restartMinBackoff(), mqttSettings.restartMaxBackoff(), mqttSettings.restartRandomFactor()).withMaxRestarts(mqttSettings.maxRestarts(), mqttSettings.restartMinBackoff()).withLogSettings(RestartSettings$.MODULE$.createLogSettings(mqttSettings.restartLogLevel()));
        this.restartingEventSource = RestartSource$.MODULE$.withBackoff(restartingEventSourceSettings(), () -> {
            Source via = Source$.MODULE$.apply(initialCommands()).concatMat(commandBroadcastSource(), Keep$.MODULE$.right()).via(sessionFlow());
            return (Source) option.fold(() -> {
                return $init$$$anonfun$6$$anonfun$1(r1);
            }, mqttLoggingSettings3 -> {
                String sb = new StringBuilder(35).append(mqttLoggingSettings3.name()).append(" : (internal) restartingEventSource").toString();
                Function1 function1 = either -> {
                    return new StringBuilder(8).append("event [").append(either).append("]").toString();
                };
                return via.log(sb, function1, via.log$default$3(sb, function1)).addAttributes(mqttLoggingSettings3.attributes());
            });
        });
        Tuple2 tuple24 = (Tuple2) restartingEventSource().map(either -> {
            if (either instanceof Right) {
                Right right = (Right) either;
                Event event = (Event) right.value();
                if (event != null) {
                    Event unapply = Event$.MODULE$.unapply(event);
                    Publish _1 = unapply._1();
                    unapply._2();
                    if (_1 instanceof Publish) {
                        Publish unapply2 = Publish$.MODULE$.unapply(_1);
                        unapply2._1();
                        unapply2._2();
                        Some _3 = unapply2._3();
                        unapply2._4();
                        if (_3 instanceof Some) {
                            Object value = _3.value();
                            commandQueue().offer(Command$.MODULE$.apply(PubAck$.MODULE$.apply(value == null ? BoxesRunTime.unboxToInt((Object) null) : ((PacketId) value).underlying())));
                            return right;
                        }
                    }
                }
            }
            return either;
        }).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.right()).toMat(BroadcastHub$.MODULE$.sink(mqttSettings.eventBroadcastSourceBufferSize()), Keep$.MODULE$.both()).run(Materializer$.MODULE$.matFromSystem(actorSystem));
        if (tuple24 == null) {
            throw new MatchError(tuple24);
        }
        Tuple2 apply2 = Tuple2$.MODULE$.apply((UniqueKillSwitch) tuple24._1(), (Source) tuple24._2());
        this.eventBroadcastSourceKillSwitch = (UniqueKillSwitch) apply2._1();
        this.eventBroadcastSource = (Source) apply2._2();
        this.eventBroadcastConsumerFuture = !mqttSettings.withEventBroadcastSourceBackpressure() ? Some$.MODULE$.apply(((Source) option.fold(this::$anonfun$3, mqttLoggingSettings3 -> {
            String sb = new StringBuilder(36).append(mqttLoggingSettings3.name()).append(" : (internal) eventBroadcastConsumer").toString();
            Function1 function1 = either2 -> {
                return new StringBuilder(8).append("event [").append(either2).append("]").toString();
            };
            return eventBroadcastSource().log(sb, function1, eventBroadcastSource().log$default$3(sb, function1)).addAttributes(mqttLoggingSettings3.attributes());
        })).runWith(Sink$.MODULE$.ignore(), Materializer$.MODULE$.matFromSystem(actorSystem))) : None$.MODULE$;
        this.publishEventBroadcastSource = eventBroadcastSource().collect(new MqttClient$$anon$1());
        this.publishMergeSinkSource = MergeHub$.MODULE$.source(mqttSettings.publishMergeSinkPerProducerBufferSize()).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.both());
        this.publishMergeSinkSourceWithOptionalLogger = (Source) option.fold(this::$init$$$anonfun$8, mqttLoggingSettings4 -> {
            String sb = new StringBuilder(30).append(mqttLoggingSettings4.name()).append(" : (internal) publishMergeSink").toString();
            Function1 function1 = mqttPublishMessage -> {
                return new StringBuilder(8).append("event [").append(mqttPublishMessage).append("]").toString();
            };
            return publishMergeSinkSource().log(sb, function1, publishMergeSinkSource().log$default$3(sb, function1)).addAttributes(mqttLoggingSettings4.attributes());
        });
        Tuple2 tuple25 = (Tuple2) publishMergeSinkSourceWithOptionalLogger().map(mqttPublishMessage -> {
            if (mqttPublishMessage == null) {
                throw new MatchError(mqttPublishMessage);
            }
            MqttPublishMessage unapply = MqttPublishMessage$.MODULE$.unapply(mqttPublishMessage);
            ByteString _1 = unapply._1();
            String _2 = unapply._2();
            session().$bang(Command$.MODULE$.apply(Publish$.MODULE$.apply(unapply._3(), _2, _1)));
        }).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).run(Materializer$.MODULE$.matFromSystem(actorSystem));
        if (tuple25 == null || (tuple22 = (Tuple2) tuple25._1()) == null) {
            throw new MatchError(tuple25);
        }
        Tuple3 apply3 = Tuple3$.MODULE$.apply((Sink) tuple22._1(), (UniqueKillSwitch) tuple22._2(), (Future) tuple25._2());
        this.publishMergeSink = (Sink) apply3._1();
        this.publishMergeSinkKillSwitch = (UniqueKillSwitch) apply3._2();
        this.publishMergeSinkFuture = (Future) apply3._3();
    }

    public MqttSettings mqttSettings() {
        return this.mqttSettings;
    }

    public MqttSessionSettings mqttSessionSettings() {
        return this.mqttSessionSettings;
    }

    public Option<MqttLoggingSettings> loggingSettings() {
        return this.loggingSettings;
    }

    public String name() {
        return this.name;
    }

    public ActorMqttClientSession session() {
        return this.session;
    }

    public Flow<ByteString, ByteString, Future<Tcp.OutgoingConnection>> tcpConnection() {
        return this.tcpConnection;
    }

    public Flow<Command<Nothing$>, Either<MqttCodec.DecodeError, Event<Nothing$>>, NotUsed> sessionFlow() {
        return this.sessionFlow;
    }

    public Command<Nothing$> connectCommand() {
        return this.connectCommand;
    }

    public List<Command<Nothing$>> subscribeCommands() {
        return this.subscribeCommands;
    }

    public List<Command<Nothing$>> initialCommands() {
        return this.initialCommands;
    }

    public Sink<Command<Nothing$>, NotUsed> commandMergeSink() {
        return this.commandMergeSink;
    }

    public UniqueKillSwitch commandMergeSinkKillSwitch() {
        return this.commandMergeSinkKillSwitch;
    }

    public Source<Command<Nothing$>, NotUsed> commandBroadcastSource() {
        return this.commandBroadcastSource;
    }

    public Source<Command<Nothing$>, SourceQueueWithComplete<Command<Nothing$>>> commandQueueSource() {
        return this.commandQueueSource;
    }

    public SourceQueueWithComplete<Command<Nothing$>> commandQueue() {
        return this.commandQueue;
    }

    public RestartSettings restartingEventSourceSettings() {
        return this.restartingEventSourceSettings;
    }

    public Source<Either<MqttCodec.DecodeError, Event<Nothing$>>, NotUsed> restartingEventSource() {
        return this.restartingEventSource;
    }

    public UniqueKillSwitch eventBroadcastSourceKillSwitch() {
        return this.eventBroadcastSourceKillSwitch;
    }

    public Source<Either<MqttCodec.DecodeError, Event<Nothing$>>, NotUsed> eventBroadcastSource() {
        return this.eventBroadcastSource;
    }

    public Option<Future<Done>> eventBroadcastConsumerFuture() {
        return this.eventBroadcastConsumerFuture;
    }

    public Source<MqttReceivedMessage, NotUsed> publishEventBroadcastSource() {
        return this.publishEventBroadcastSource;
    }

    public Source<MqttPublishMessage, Tuple2<Sink<MqttPublishMessage, NotUsed>, UniqueKillSwitch>> publishMergeSinkSource() {
        return this.publishMergeSinkSource;
    }

    public Source<MqttPublishMessage, Tuple2<Sink<MqttPublishMessage, NotUsed>, UniqueKillSwitch>> publishMergeSinkSourceWithOptionalLogger() {
        return this.publishMergeSinkSourceWithOptionalLogger;
    }

    public Sink<MqttPublishMessage, NotUsed> publishMergeSink() {
        return this.publishMergeSink;
    }

    public UniqueKillSwitch publishMergeSinkKillSwitch() {
        return this.publishMergeSinkKillSwitch;
    }

    public Future<Done> publishMergeSinkFuture() {
        return this.publishMergeSinkFuture;
    }

    public Future<Done> shutdown() {
        Future successful;
        commandQueue().complete();
        commandMergeSinkKillSwitch().shutdown();
        publishMergeSinkKillSwitch().shutdown();
        eventBroadcastSourceKillSwitch().shutdown();
        Some eventBroadcastConsumerFuture = eventBroadcastConsumerFuture();
        if (eventBroadcastConsumerFuture instanceof Some) {
            successful = (Future) eventBroadcastConsumerFuture.value();
        } else {
            if (!None$.MODULE$.equals(eventBroadcastConsumerFuture)) {
                throw new MatchError(eventBroadcastConsumerFuture);
            }
            successful = Future$.MODULE$.successful(Done$.MODULE$);
        }
        return Future$.MODULE$.reduceLeft(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Future[]{commandQueue().watchCompletion(), publishMergeSinkFuture(), successful})), (done, done2) -> {
            return Done$.MODULE$;
        }, this.system.executionContext());
    }

    private static final String $init$$$anonfun$1() {
        return "";
    }

    private final Source $init$$$anonfun$4() {
        return commandQueueSource();
    }

    private static final Source $init$$$anonfun$6$$anonfun$1(Source source) {
        return source;
    }

    private final Source $anonfun$3() {
        return eventBroadcastSource();
    }

    private final Source $init$$$anonfun$8() {
        return publishMergeSinkSource();
    }
}
