package io.mantisrx.connectors.publish.source.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.connectors.publish.core.QueryRegistry;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import mantis.io.reactivex.netty.protocol.http.server.UriInfoHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.subjects.Subject;

/* loaded from: input_file:io/mantisrx/connectors/publish/source/http/HttpSourceServerHandler.class */
public class HttpSourceServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceServerHandler.class);
    private static final byte[] CONTENT = {79, 75};
    private static final AsciiString CONTENT_TYPE = AsciiString.cached("Content-Type");
    private static final AsciiString CONTENT_LENGTH = AsciiString.cached("Content-Length");
    private static final AsciiString CONNECTION = AsciiString.cached("Connection");
    private static final AsciiString KEEP_ALIVE = AsciiString.cached("keep-alive");
    private final Counter getRequestCount;
    private final Counter unknownRequestCount;
    private final Counter postRequestCount;
    private final QueryRegistry registry;
    private final Subject<String, String> eventSubject;
    ObjectMapper mapper = new ObjectMapper();
    MetricGroupId metricGroupId = new MetricGroupId("PushServer_incoming");

    public HttpSourceServerHandler(QueryRegistry queryRegistry, Subject<String, String> subject) {
        this.registry = queryRegistry;
        this.eventSubject = subject;
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id(this.metricGroupId).addCounter("GetRequestCount").addCounter("PostRequestCount").addCounter("UnknownRequestCount").build());
        this.getRequestCount = registerAndGet.getCounter("GetRequestCount");
        this.unknownRequestCount = registerAndGet.getCounter("UnknownRequestCount");
        this.postRequestCount = registerAndGet.getCounter("PostRequestCount");
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) {
        if (httpObject instanceof HttpRequest) {
            HttpRequest httpRequest = (HttpRequest) httpObject;
            boolean isKeepAlive = HttpUtil.isKeepAlive(httpRequest);
            if (httpRequest.method().equals(HttpMethod.GET)) {
                this.getRequestCount.increment();
                try {
                    DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(this.mapper.writeValueAsBytes(new MantisServerSubscriptionEnvelope(this.registry.getCurrentSubscriptions(new UriInfoHolder(httpRequest.uri()).getQueryParameters())))));
                    defaultFullHttpResponse.headers().set(CONTENT_TYPE, "application/json");
                    defaultFullHttpResponse.headers().setInt(CONTENT_LENGTH, defaultFullHttpResponse.content().readableBytes());
                    if (isKeepAlive) {
                        defaultFullHttpResponse.headers().set(CONNECTION, KEEP_ALIVE);
                        channelHandlerContext.write(defaultFullHttpResponse);
                    } else {
                        channelHandlerContext.write(defaultFullHttpResponse).addListener(ChannelFutureListener.CLOSE);
                    }
                    return;
                } catch (Exception e) {
                    LOGGER.error("problem reading from channel", e);
                    return;
                }
            }
            if (!httpRequest.method().equals(HttpMethod.POST)) {
                this.unknownRequestCount.increment();
                return;
            }
            this.postRequestCount.increment();
            String byteBuf = ((FullHttpMessage) httpObject).content().toString(CharsetUtil.UTF_8);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("got data " + byteBuf);
            }
            this.eventSubject.onNext(byteBuf);
            DefaultFullHttpResponse defaultFullHttpResponse2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(CONTENT));
            defaultFullHttpResponse2.headers().set(CONTENT_TYPE, "text/plain");
            defaultFullHttpResponse2.headers().setInt(CONTENT_LENGTH, defaultFullHttpResponse2.content().readableBytes());
            if (!isKeepAlive) {
                channelHandlerContext.write(defaultFullHttpResponse2).addListener(ChannelFutureListener.CLOSE);
            } else {
                defaultFullHttpResponse2.headers().set(CONNECTION, KEEP_ALIVE);
                channelHandlerContext.write(defaultFullHttpResponse2);
            }
        }
    }
}
