package io.reactivex.mantis.network.push;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.mql.jvm.core.Query;
import io.mantisrx.mql.shaded.clojure.java.api.Clojure;
import io.mantisrx.mql.shaded.clojure.lang.IFn;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerRequest;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerResponse;
import mantis.io.reactivex.netty.protocol.http.server.RequestHandler;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import mantis.io.reactivex.netty.server.RxServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

/* loaded from: input_file:io/reactivex/mantis/network/push/PushServerSse.class */
public class PushServerSse<T, S> extends PushServer<T, ServerSentEvent> {
    public static final String PROCESSED_COUNTER_METRIC_NAME = "processedCounter";
    public static final String DROPPED_COUNTER_METRIC_NAME = "droppedCounter";
    private static IFn mqlMakeQuery;
    private static IFn mqlParses;
    private Func2<Map<String, List<String>>, S, Void> requestPreprocessor;
    private Func2<Map<String, List<String>>, S, Void> requestPostprocessor;
    private final Func2<Map<String, List<String>>, S, Void> subscribeProcessor;
    private S processorState;
    private Func1<Map<String, List<String>>, Func1<T, Boolean>> predicate;
    private boolean supportLegacyMetrics;
    private MetricsRegistry metricsRegistry;
    private static final Logger logger = LoggerFactory.getLogger(PushServerSse.class);
    private static IFn require = Clojure.var("io.mantisrx.mql.shaded.clojure.core", "require");

    public PushServerSse(PushTrigger<T> pushTrigger, ServerConfig<T> serverConfig, PublishSubject<String> publishSubject, Func2<Map<String, List<String>>, S, Void> func2, Func2<Map<String, List<String>>, S, Void> func22, Func2<Map<String, List<String>>, S, Void> func23, S s, boolean z) {
        super(pushTrigger, serverConfig, publishSubject);
        this.metricsRegistry = serverConfig.getMetricsRegistry();
        this.predicate = serverConfig.getPredicate();
        this.processorState = s;
        this.requestPostprocessor = func22;
        this.requestPreprocessor = func2;
        this.subscribeProcessor = func23;
        this.supportLegacyMetrics = z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Metrics registerSseMetrics(String str, String str2) {
        return this.metricsRegistry.registerAndGet(new Metrics.Builder().id(this.supportLegacyMetrics ? "ServerSentEventRequestHandler" : "PushServerSse", new Tag[]{new BasicTag("clientId", (String) Optional.ofNullable(str).orElse("none")), new BasicTag("sockAddr", (String) Optional.ofNullable(str2).orElse("none"))}).addCounter(PROCESSED_COUNTER_METRIC_NAME).addCounter(DROPPED_COUNTER_METRIC_NAME).build());
    }

    @Override // io.reactivex.mantis.network.push.PushServer
    public RxServer<?, ?> createServer() {
        return RxNetty.newHttpServerBuilder(this.port, new RequestHandler<String, ServerSentEvent>() { // from class: io.reactivex.mantis.network.push.PushServerSse.1
            public Observable<Void> handle(HttpServerRequest<String> httpServerRequest, HttpServerResponse<ServerSentEvent> httpServerResponse) {
                Metrics registerSseMetrics;
                final Map queryParameters = httpServerRequest.getQueryParameters();
                boolean z = false;
                boolean z2 = false;
                AtomicLong atomicLong = new AtomicLong(2L);
                SerializedSubject<String, String> serialized = PublishSubject.create().toSerialized();
                AtomicLong atomicLong2 = new AtomicLong(1000L);
                boolean z3 = false;
                AtomicLong atomicLong3 = new AtomicLong();
                Subscription subscription = null;
                Subscription subscription2 = null;
                boolean z4 = false;
                long j = 0;
                String str = null;
                String str2 = null;
                String str3 = null;
                Func1<T, Boolean> func1 = null;
                if (PushServerSse.this.predicate != null) {
                    func1 = (Func1) PushServerSse.this.predicate.call(queryParameters);
                }
                if (queryParameters != null && !queryParameters.isEmpty()) {
                    if (queryParameters.containsKey("id")) {
                        str3 = (String) ((List) queryParameters.get("id")).get(0);
                    }
                    if (queryParameters.containsKey("slotId")) {
                        str2 = (String) ((List) queryParameters.get("slotId")).get(0);
                    }
                    if (queryParameters.containsKey("groupId")) {
                        str = (String) ((List) queryParameters.get("groupId")).get(0);
                    }
                    if (queryParameters.containsKey("clientId")) {
                        str = (String) ((List) queryParameters.get("clientId")).get(0);
                    }
                    if (queryParameters.containsKey("heartbeatSec")) {
                        atomicLong.set(Long.parseLong((String) ((List) queryParameters.get("heartbeatSec")).get(0)));
                        if (atomicLong.get() < 1) {
                            throw new IllegalArgumentException("Sampling rate too low: 0");
                        }
                        z = true;
                    }
                    if (queryParameters != null && queryParameters.containsKey("mantis.EnableCompressedBinary") && "true".equalsIgnoreCase((String) ((List) queryParameters.get("mantis.EnableCompressedBinary")).get(0))) {
                        PushServerSse.logger.info("Binary compression requested");
                        z2 = true;
                    }
                    if (queryParameters.containsKey("enablePings") && "true".equalsIgnoreCase((String) ((List) queryParameters.get("enablePings")).get(0))) {
                        z = true;
                    }
                    if (queryParameters.containsKey("enableMetaMessages") && "true".equalsIgnoreCase((String) ((List) queryParameters.get("enableMetaMessages")).get(0))) {
                        z3 = true;
                    }
                    if (queryParameters.containsKey("metaMessagesSec")) {
                        atomicLong2.set(Long.parseLong((String) ((List) queryParameters.get("metaMessagesSec")).get(0)));
                        if (atomicLong2.get() < 250) {
                            throw new IllegalArgumentException("Meta message frequence rate too low: " + atomicLong2.get());
                        }
                        z3 = true;
                    }
                    if (queryParameters.containsKey("sample")) {
                        j = Long.parseLong((String) ((List) queryParameters.get("sample")).get(0)) * 1000;
                        if (j < 50) {
                            throw new IllegalArgumentException("Sampling rate too low: " + j);
                        }
                        z4 = true;
                    }
                    if (queryParameters.containsKey("sampleMSec")) {
                        j = Long.parseLong((String) ((List) queryParameters.get("sampleMSec")).get(0));
                        if (j < 50) {
                            throw new IllegalArgumentException("Sampling rate too low: " + j);
                        }
                        z4 = true;
                    }
                    if (queryParameters.containsKey("mql")) {
                        String str4 = (String) ((List) queryParameters.get("mql")).get(0);
                        if (((Boolean) PushServerSse.mqlParses.invoke(str4)).booleanValue()) {
                            Query query = (Query) PushServerSse.mqlMakeQuery.invoke(str, str4);
                            func1 = obj -> {
                                return Boolean.valueOf(obj instanceof Map ? query.matches((Map) obj).booleanValue() : true);
                            };
                        }
                    }
                }
                InetSocketAddress inetSocketAddress = (InetSocketAddress) httpServerResponse.getChannel().remoteAddress();
                if (str == null) {
                    String inetAddress = inetSocketAddress.getAddress().toString();
                    registerSseMetrics = PushServerSse.this.registerSseMetrics(inetAddress, inetAddress);
                } else {
                    registerSseMetrics = PushServerSse.this.registerSseMetrics(str, inetSocketAddress.getAddress().toString());
                }
                Counter counter = registerSseMetrics.getCounter(PushServerSse.PROCESSED_COUNTER_METRIC_NAME);
                Counter counter2 = registerSseMetrics.getCounter(PushServerSse.DROPPED_COUNTER_METRIC_NAME);
                httpServerResponse.getHeaders().set("Access-Control-Allow-Origin", "*");
                httpServerResponse.getHeaders().set("content-type", "text/event-stream");
                httpServerResponse.getHeaders().set("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
                httpServerResponse.getHeaders().set("Pragma", "no-cache");
                httpServerResponse.flush();
                if (queryParameters != null && PushServerSse.this.requestPreprocessor != null) {
                    PushServerSse.this.requestPreprocessor.call(queryParameters, PushServerSse.this.processorState);
                }
                if (z3 && atomicLong2.get() > 0) {
                    PushServerSse.logger.info("Enabling Meta messages, interval : " + atomicLong2.get() + " ms");
                    subscription2 = serialized.throttleLast(atomicLong2.get(), TimeUnit.MILLISECONDS).doOnNext(str5 -> {
                        if (str5 == null || str5.isEmpty()) {
                            return;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        httpServerResponse.writeAndFlush(new ServerSentEvent(httpServerResponse.getAllocator().buffer().writeBytes(str5.getBytes())));
                        atomicLong3.set(currentTimeMillis);
                    }).subscribe();
                }
                if (z && atomicLong.get() > 0) {
                    PushServerSse.logger.info("Enabling hearts, interval: " + atomicLong);
                    subscription = Observable.interval(2L, atomicLong.get(), TimeUnit.SECONDS).doOnNext(l -> {
                        long currentTimeMillis = System.currentTimeMillis();
                        if ((currentTimeMillis - atomicLong3.get()) / 1000 > atomicLong.get()) {
                            httpServerResponse.writeAndFlush(new ServerSentEvent(httpServerResponse.getAllocator().buffer().writeBytes("ping".getBytes())));
                            atomicLong3.set(currentTimeMillis);
                        }
                    }).subscribe();
                }
                Action0 action0 = null;
                if (queryParameters != null && PushServerSse.this.requestPostprocessor != null) {
                    action0 = new Action0() { // from class: io.reactivex.mantis.network.push.PushServerSse.1.1
                        public void call() {
                            PushServerSse.this.requestPostprocessor.call(queryParameters, PushServerSse.this.processorState);
                        }
                    };
                }
                return PushServerSse.this.manageConnectionWithCompression(httpServerResponse, inetSocketAddress.getHostString(), inetSocketAddress.getPort(), str, str2, str3, atomicLong3, z, subscription, z4, j, serialized, subscription2, func1, action0, counter, counter2, new Action0() { // from class: io.reactivex.mantis.network.push.PushServerSse.1.1SubscribeCallback
                    public void call() {
                        if (queryParameters == null || PushServerSse.this.subscribeProcessor == null) {
                            return;
                        }
                        PushServerSse.this.subscribeProcessor.call(queryParameters, PushServerSse.this.processorState);
                    }
                }, z2, true);
            }
        }).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator()).channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1048576, 5242880)).build();
    }

    static {
        require.invoke(Clojure.read("io.mantisrx.mql.jvm.interfaces.core"));
        require.invoke(Clojure.read("io.mantisrx.mql.jvm.interfaces.server"));
        mqlMakeQuery = Clojure.var("io.mantisrx.mql.jvm.interfaces.server", "make-query");
        mqlParses = Clojure.var("io.mantisrx.mql.jvm.interfaces.core", "parses?");
    }
}
