package io.mantisrx.connector.job.source;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.mantisrx.common.utils.Closeables;
import io.mantisrx.client.MantisSSEJob;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.connector.job.core.AbstractSourceJobSource;
import io.mantisrx.connector.job.core.DefaultSinkConnectionStatusObserver;
import io.mantisrx.connector.job.core.MantisSourceJobConnector;
import io.mantisrx.connector.job.core.MultiSinkConnectionStatusObserver;
import io.mantisrx.connector.job.core.SinkConnectionStatusObserver;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/connector/job/source/JobSource.class */
public class JobSource extends AbstractSourceJobSource implements Source<MantisServerSentEvent> {
    private static final Logger log = LoggerFactory.getLogger(JobSource.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(JobSource.class);
    private static JsonParser parser = new JsonParser();
    protected List<TargetInfo> targets;
    private final List<MantisSSEJob> jobs;

    @Deprecated
    /* loaded from: input_file:io/mantisrx/connector/job/source/JobSource$TargetInfo.class */
    public static class TargetInfo {
        public String sourceJobName;
        public String criterion;
        public int samplePerSec;
        public boolean isBroadcastMode;
        public boolean enableMetaMessages;
        public boolean enableCompressedBinary;
        public String clientId;
        public Map<String, String> additionalParams;

        public TargetInfo(String str, String str2, String str3, int i, boolean z, boolean z2, boolean z3) {
            this(str, str2, str3, i, z, z2, z3, null);
        }

        public TargetInfo(String str, String str2, String str3, int i, boolean z, boolean z2, boolean z3, Map<String, String> map) {
            this.sourceJobName = str;
            this.criterion = str2;
            this.clientId = str3;
            this.samplePerSec = i;
            this.isBroadcastMode = z;
            this.enableMetaMessages = z2;
            this.enableCompressedBinary = z3;
            this.additionalParams = map;
        }
    }

    @Deprecated
    /* loaded from: input_file:io/mantisrx/connector/job/source/JobSource$TargetInfoBuilder.class */
    public static class TargetInfoBuilder {
        private String sourceJobName;
        private String criterion;
        private String clientId;
        private int samplePerSec = -1;
        private boolean isBroadcastMode = false;
        private boolean enableMetaMessages = false;
        private boolean enableCompressedBinary = false;
        private Map<String, String> additionalParams = new HashMap();

        public TargetInfoBuilder withSourceJobName(String str) {
            this.sourceJobName = str;
            return this;
        }

        public TargetInfoBuilder withQuery(String str) {
            this.criterion = str;
            return this;
        }

        public TargetInfoBuilder withSamplePerSec(int i) {
            this.samplePerSec = i;
            return this;
        }

        public TargetInfoBuilder withBroadCastMode() {
            this.isBroadcastMode = true;
            return this;
        }

        public TargetInfoBuilder withMetaMessagesEnabled() {
            this.enableMetaMessages = true;
            return this;
        }

        public TargetInfoBuilder withBinaryCompressionEnabled() {
            this.enableCompressedBinary = true;
            return this;
        }

        public TargetInfoBuilder withClientId(String str) {
            this.clientId = str;
            return this;
        }

        public TargetInfoBuilder withAdditionalParams(Map<String, String> map) {
            this.additionalParams = map;
            return this;
        }

        public TargetInfo build() {
            return new TargetInfo(this.sourceJobName, this.criterion, this.clientId, this.samplePerSec, this.isBroadcastMode, this.enableMetaMessages, this.enableCompressedBinary, this.additionalParams);
        }
    }

    public JobSource(List<TargetInfo> list) {
        this.jobs = new ArrayList();
        this.targets = list;
    }

    public JobSource() {
        this(new ArrayList());
    }

    public JobSource(String str) {
        this.jobs = new ArrayList();
        this.targets = parseTargetInfo(str);
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new StringParameter().name(MantisSourceJobConnector.MANTIS_SOURCEJOB_TARGET_KEY).validator(Validators.notNullOrEmpty()).defaultValue("{}").build());
        return newArrayList;
    }

    public Observable<Observable<MantisServerSentEvent>> call(Context context, Index index) {
        if (this.targets.isEmpty()) {
            this.targets = parseInputParameters(context);
        }
        Observable<Observable<MantisServerSentEvent>> observable = null;
        int workerNumber = context.getWorkerInfo().getWorkerNumber();
        this.targets = enforceClientIdConsistency(this.targets, context.getJobId());
        for (TargetInfo targetInfo : this.targets) {
            String str = targetInfo.sourceJobName;
            String str2 = targetInfo.criterion;
            int i = targetInfo.samplePerSec;
            boolean z = targetInfo.enableMetaMessages;
            LOGGER.info("Processing job " + str);
            SinkConnectionStatusObserver defaultSinkConnectionStatusObserver = DefaultSinkConnectionStatusObserver.getInstance(false);
            MultiSinkConnectionStatusObserver.INSTANCE.addSinkConnectionObserver(str, defaultSinkConnectionStatusObserver);
            String str3 = targetInfo.clientId;
            if (targetInfo.isBroadcastMode) {
                str3 = str3 + "_" + workerNumber;
            }
            MantisSSEJob sourceJob = getSourceJob(str, str2, str3, i, z, targetInfo.enableCompressedBinary, defaultSinkConnectionStatusObserver, targetInfo.additionalParams, Optional.empty());
            this.jobs.add(sourceJob);
            if (observable == null) {
                observable = sourceJob.connectAndGet();
            } else if (sourceJob != null) {
                Observable connectAndGet = sourceJob.connectAndGet();
                if (connectAndGet != null) {
                    observable = observable.mergeWith(connectAndGet);
                } else {
                    LOGGER.error("Could not connect to job " + str);
                }
            } else {
                LOGGER.error("Could not connect to job " + str);
            }
        }
        return observable;
    }

    public void close() throws IOException {
        try {
            Closeables.combine(this.jobs).close();
        } finally {
            this.jobs.clear();
        }
    }

    protected static List<TargetInfo> parseInputParameters(Context context) {
        return parseTargetInfo((String) context.getParameters().get(MantisSourceJobConnector.MANTIS_SOURCEJOB_TARGET_KEY, "{}"));
    }

    @Deprecated
    protected static List<TargetInfo> parseTargetInfo(String str) {
        ArrayList arrayList = new ArrayList();
        JsonArray asJsonArray = parser.parse(str).get("targets").getAsJsonArray();
        for (int i = 0; i < asJsonArray.size(); i++) {
            JsonObject asJsonObject = asJsonArray.get(i).getAsJsonObject();
            String asString = asJsonObject.get(MantisSourceJobConnector.MANTIS_SOURCEJOB_NAME_PARAM).getAsString();
            String asString2 = asJsonObject.get(MantisSourceJobConnector.MANTIS_SOURCEJOB_CRITERION).getAsString();
            String asString3 = asJsonObject.get("clientId") != null ? asJsonObject.get("clientId").getAsString() : null;
            int asInt = asJsonObject.get(MantisSourceJobConnector.MANTIS_SOURCEJOB_SAMPLE_PER_SEC_KEY) != null ? asJsonObject.get(MantisSourceJobConnector.MANTIS_SOURCEJOB_SAMPLE_PER_SEC_KEY).getAsInt() : -1;
            boolean asBoolean = asJsonObject.get(MantisSourceJobConnector.MANTIS_SOURCEJOB_IS_BROADCAST_MODE) != null ? asJsonObject.get(MantisSourceJobConnector.MANTIS_SOURCEJOB_IS_BROADCAST_MODE).getAsBoolean() : false;
            boolean asBoolean2 = asJsonObject.get(MantisSourceJobConnector.MANTIS_ENABLE_META_MESSAGES) != null ? asJsonObject.get(MantisSourceJobConnector.MANTIS_ENABLE_META_MESSAGES).getAsBoolean() : false;
            boolean z = asJsonObject.get("mantis.EnableCompressedBinary") != null;
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : asJsonObject.entrySet()) {
                String str2 = (String) entry.getKey();
                if (str2.equals(MantisSourceJobConnector.MANTIS_SOURCEJOB_NAME_PARAM) || str2.equals(MantisSourceJobConnector.MANTIS_SOURCEJOB_CRITERION) || str2.equals("clientId") || str2.equals(MantisSourceJobConnector.MANTIS_SOURCEJOB_SAMPLE_PER_SEC_KEY) || str2.equals(MantisSourceJobConnector.MANTIS_SOURCEJOB_IS_BROADCAST_MODE) || str2.equals(MantisSourceJobConnector.MANTIS_ENABLE_META_MESSAGES) || str2.equals("mantis.EnableCompressedBinary")) {
                    LOGGER.warn("Overwriting key " + str2 + " in additionalParams");
                }
                hashMap.put(str2, ((JsonElement) entry.getValue()).getAsString());
            }
            arrayList.add(new TargetInfo(asString, asString2, asString3, asInt, asBoolean, asBoolean2, z, hashMap));
            LOGGER.info("sname: " + asString + " criterion: " + asString2 + " isBroadcastMode " + asBoolean);
        }
        return arrayList;
    }

    @Deprecated
    public static List<TargetInfo> enforceClientIdConsistency(List<TargetInfo> list, String str) {
        list.sort(Comparator.comparing(targetInfo -> {
            return targetInfo.criterion;
        }));
        HashSet hashSet = new HashSet(list.size());
        for (TargetInfo targetInfo2 : list) {
            if (targetInfo2.clientId == null) {
                targetInfo2.clientId = str;
            }
            Tuple2 of = Tuple.of(targetInfo2.sourceJobName, targetInfo2.clientId);
            int i = 0;
            while (hashSet.contains(of)) {
                i++;
                of = Tuple.of(targetInfo2.sourceJobName, targetInfo2.clientId + "_" + i);
            }
            targetInfo2.clientId = (String) of._2;
            hashSet.add(of);
        }
        return list;
    }
}
