package it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.master;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.actor.Props$;
import akka.cluster.UniqueAddress;
import akka.pattern.Patterns$;
import akka.util.Timeout$;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkBatchReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkReaderFactory;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkStructuredStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.pipegraph.PipegraphGuardian;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.pipegraph.PipegraphGuardian$;
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.watchdog.SparkContextWatchDog$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkWriterFactory;
import it.agilelab.bigdata.wasp.core.messages.PipegraphMessages;
import it.agilelab.bigdata.wasp.models.PipegraphInstanceModel;
import it.agilelab.bigdata.wasp.models.PipegraphModel;
import it.agilelab.bigdata.wasp.models.ReaderModel;
import it.agilelab.bigdata.wasp.models.StreamingReaderModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.WriterModel;
import it.agilelab.bigdata.wasp.repository.core.bl.FreeCodeBL;
import it.agilelab.bigdata.wasp.repository.core.bl.MlModelBL;
import it.agilelab.bigdata.wasp.repository.core.bl.PipegraphBL;
import it.agilelab.bigdata.wasp.repository.core.bl.ProcessGroupBL;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Function1;
import scala.Function3;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.PartialFunction$;
import scala.Unit$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.mutable.ListBuffer$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.StructuralCallSite;

/* compiled from: SparkConsumersStreamingMasterGuardian.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/master/SparkConsumersStreamingMasterGuardian$.class */
public final class SparkConsumersStreamingMasterGuardian$ {
    public static SparkConsumersStreamingMasterGuardian$ MODULE$;

    static {
        new SparkConsumersStreamingMasterGuardian$();
    }

    public static Method reflMethod$Method1(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("mlModelBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method2(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("topicBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method3(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("freeCodeBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method4(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("processGroupBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public Function3<ActorRef, String, ActorRefFactory, ActorRef> exitingWatchdogCreator(SparkContext sparkContext, int i) {
        return (actorRef, str, actorRefFactory) -> {
            return actorRefFactory.actorOf(SparkContextWatchDog$.MODULE$.exitingWatchdogProps(sparkContext, i), str);
        };
    }

    public Function3<ActorRef, String, ActorRefFactory, ActorRef> doNothingWatchdogCreator(SparkContext sparkContext) {
        return (actorRef, str, actorRefFactory) -> {
            return actorRefFactory.actorOf(SparkContextWatchDog$.MODULE$.logAndDoNothingWatchdogProps(sparkContext), str);
        };
    }

    public Function3<ActorRef, String, ActorRefFactory, ActorRef> defaultChildCreator(SparkSession sparkSession, SparkReaderFactory sparkReaderFactory, SparkWriterFactory sparkWriterFactory, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, Function1<StructuredStreamingETLModel, PipegraphGuardian.Choice> function1, Object obj) {
        return (actorRef, str, actorRefFactory) -> {
            String sb = new StringBuilder(1).append(str).append("-").append(UUID.randomUUID()).toString();
            Function3<StructuredStreamingETLModel, StreamingReaderModel, SparkSession, Option<SparkStructuredStreamingReader>> function3 = (structuredStreamingETLModel, streamingReaderModel, sparkSession2) -> {
                return sparkReaderFactory.createSparkStructuredStreamingReader(obj, sparkSession2, structuredStreamingETLModel, streamingReaderModel);
            };
            Function3<StructuredStreamingETLModel, ReaderModel, SparkSession, Option<SparkBatchReader>> function32 = (structuredStreamingETLModel2, readerModel, sparkSession3) -> {
                return sparkReaderFactory.createSparkBatchReader(obj, sparkSession3.sparkContext(), readerModel);
            };
            Function3<StructuredStreamingETLModel, WriterModel, SparkSession, Option<SparkStructuredStreamingWriter>> function33 = (structuredStreamingETLModel3, writerModel, sparkSession4) -> {
                return sparkWriterFactory.createSparkWriterStructuredStreaming(obj, sparkSession4, structuredStreamingETLModel3, writerModel);
            };
            try {
                try {
                    try {
                        try {
                            Function3<PipegraphModel, String, ActorRefFactory, ActorRef> defaultChildFactory = PipegraphGuardian$.MODULE$.defaultChildFactory(sparkSession, (MlModelBL) reflMethod$Method1(obj.getClass()).invoke(obj, new Object[0]), (TopicBL) reflMethod$Method2(obj.getClass()).invoke(obj, new Object[0]), (FreeCodeBL) reflMethod$Method3(obj.getClass()).invoke(obj, new Object[0]), (ProcessGroupBL) reflMethod$Method4(obj.getClass()).invoke(obj, new Object[0]), function3, function32, function33);
                            return actorRefFactory.actorOf(PipegraphGuardian$.MODULE$.props(actorRef, str, defaultChildFactory, finiteDuration, finiteDuration2, function1), URLEncoder.encode(sb.replaceAll(" ", "-"), StandardCharsets.UTF_8.name()));
                        } catch (InvocationTargetException e) {
                            throw e.getCause();
                        }
                    } catch (InvocationTargetException e2) {
                        throw e2.getCause();
                    }
                } catch (InvocationTargetException e3) {
                    throw e3.getCause();
                }
            } catch (InvocationTargetException e4) {
                throw e4.getCause();
            }
        };
    }

    public Props props(PipegraphBL pipegraphBL, Function3<ActorRef, String, ActorRefFactory, ActorRef> function3, String str, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, Option<ActorRef> option, SchedulingStrategyFactory schedulingStrategyFactory) {
        return Props$.MODULE$.apply(() -> {
            return new SparkConsumersStreamingMasterGuardian(pipegraphBL, function3, str, finiteDuration, finiteDuration2, option, schedulingStrategyFactory.create());
        }, ClassTag$.MODULE$.apply(SparkConsumersStreamingMasterGuardian.class));
    }

    public Option<ActorRef> props$default$6() {
        return None$.MODULE$;
    }

    public SchedulingStrategyFactory props$default$7() {
        return new FifoSchedulingStrategyFactory();
    }

    public <A, B> PartialFunction<A, B> it$agilelab$bigdata$wasp$consumers$spark$streaming$actor$master$SparkConsumersStreamingMasterGuardian$$compose(Seq<PartialFunction<A, B>> seq) {
        return (PartialFunction) seq.foldLeft(PartialFunction$.MODULE$.empty(), (partialFunction, partialFunction2) -> {
            return partialFunction.orElse(partialFunction2);
        });
    }

    private <T, U> Future<List<U>> sequenceFutures(TraversableOnce<T> traversableOnce, Function1<T, Future<U>> function1, ExecutionContext executionContext) {
        return ((Future) traversableOnce.foldLeft(Future$.MODULE$.successful(ListBuffer$.MODULE$.empty()), (future, obj) -> {
            return future.flatMap(listBuffer -> {
                return ((Future) function1.apply(obj)).map(obj -> {
                    return listBuffer.$plus$eq(obj);
                }, executionContext);
            }, executionContext);
        })).map(listBuffer -> {
            return listBuffer.toList();
        }, executionContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<String> askToStop(ActorRef actorRef, String str, FiniteDuration finiteDuration, ExecutionContext executionContext) {
        return Patterns$.MODULE$.ask(actorRef, Protocol$.MODULE$.StopPipegraph().apply(str), Timeout$.MODULE$.durationToTimeout(finiteDuration)).flatMap(obj -> {
            if (obj instanceof PipegraphMessages.PipegraphStopped) {
                String name = ((PipegraphMessages.PipegraphStopped) obj).name();
                if (str != null ? str.equals(name) : name == null) {
                    return Future$.MODULE$.successful(str);
                }
            }
            if (obj instanceof PipegraphMessages.PipegraphNotStopped) {
                String name2 = ((PipegraphMessages.PipegraphNotStopped) obj).name();
                if (str != null ? str.equals(name2) : name2 == null) {
                    return MODULE$.askToStop(actorRef, str, finiteDuration, executionContext);
                }
            }
            throw new Exception("unexpected result");
        }, executionContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<String> askToStart(ActorRef actorRef, String str, FiniteDuration finiteDuration, ExecutionContext executionContext) {
        return Patterns$.MODULE$.ask(actorRef, Protocol$.MODULE$.StartPipegraph().apply(str), Timeout$.MODULE$.durationToTimeout(finiteDuration)).flatMap(obj -> {
            if (obj instanceof PipegraphMessages.PipegraphStarted) {
                String name = ((PipegraphMessages.PipegraphStarted) obj).name();
                if (str != null ? str.equals(name) : name == null) {
                    return Future$.MODULE$.successful(str);
                }
            }
            if (obj instanceof PipegraphMessages.PipegraphNotStarted) {
                String name2 = ((PipegraphMessages.PipegraphNotStarted) obj).name();
                if (str != null ? str.equals(name2) : name2 == null) {
                    return MODULE$.askToStart(actorRef, str, finiteDuration, executionContext);
                }
            }
            throw new Exception("unexpected result");
        }, executionContext);
    }

    private Future<Seq<String>> askToStopSeq(ActorRef actorRef, Seq<String> seq, FiniteDuration finiteDuration, ExecutionContext executionContext) {
        return sequenceFutures(seq, str -> {
            return MODULE$.askToStop(actorRef, str, finiteDuration, executionContext);
        }, executionContext);
    }

    public Future<Seq<String>> it$agilelab$bigdata$wasp$consumers$spark$streaming$actor$master$SparkConsumersStreamingMasterGuardian$$askToStartSeq(ActorRef actorRef, Seq<String> seq, FiniteDuration finiteDuration, ExecutionContext executionContext) {
        return sequenceFutures(seq, str -> {
            return MODULE$.askToStart(actorRef, str, finiteDuration, executionContext);
        }, executionContext);
    }

    public Future<BoxedUnit> it$agilelab$bigdata$wasp$consumers$spark$streaming$actor$master$SparkConsumersStreamingMasterGuardian$$orderlyRestart(ActorRef actorRef, Seq<PipegraphInstanceModel> seq, FiniteDuration finiteDuration, ExecutionContext executionContext) {
        return askToStopSeq(actorRef, (Seq) seq.map(pipegraphInstanceModel -> {
            return pipegraphInstanceModel.instanceOf();
        }, Seq$.MODULE$.canBuildFrom()), finiteDuration, executionContext).flatMap(seq2 -> {
            return MODULE$.it$agilelab$bigdata$wasp$consumers$spark$streaming$actor$master$SparkConsumersStreamingMasterGuardian$$askToStartSeq(actorRef, seq2, finiteDuration, executionContext);
        }, executionContext).map(seq3 -> {
            Unit$.MODULE$;
            return BoxedUnit.UNIT;
        }, executionContext);
    }

    public String formatUniqueAddress(UniqueAddress uniqueAddress) {
        return new StringBuilder(1).append(uniqueAddress.address().toString()).append("/").append(uniqueAddress.longUid()).toString();
    }

    private SparkConsumersStreamingMasterGuardian$() {
        MODULE$ = this;
    }
}
