package dev.restate.sdk.http.vertx;

import dev.restate.sdk.core.InvocationFlow;
import dev.restate.sdk.core.ProtocolException;
import dev.restate.sdk.core.RestateEndpoint;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.AsciiString;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.reactiverse.contextual.logging.ContextualData;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dev/restate/sdk/http/vertx/RequestHttpServerHandler.class */
class RequestHttpServerHandler implements Handler<HttpServerRequest> {
    private static final String DISCOVER_PATH = "/discover";
    private final RestateEndpoint restateEndpoint;
    private final OpenTelemetry openTelemetry;
    private static final Logger LOG = LogManager.getLogger(RequestHttpServerHandler.class);
    private static final AsciiString X_RESTATE_SERVER_KEY = AsciiString.cached("x-restate-server");
    private static final AsciiString X_RESTATE_SERVER_VALUE = AsciiString.cached("restate-sdk-java/1.1.1_d479a466");
    private static final Pattern SLASH = Pattern.compile(Pattern.quote("/"));
    static TextMapGetter<MultiMap> OTEL_TEXT_MAP_GETTER = new TextMapGetter<MultiMap>() { // from class: dev.restate.sdk.http.vertx.RequestHttpServerHandler.1
        public Iterable<String> keys(MultiMap multiMap) {
            return multiMap.names();
        }

        public String get(MultiMap multiMap, String str) {
            if (multiMap == null) {
                return null;
            }
            return multiMap.get(str);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestHttpServerHandler(RestateEndpoint restateEndpoint, OpenTelemetry openTelemetry) {
        this.restateEndpoint = restateEndpoint;
        this.openTelemetry = openTelemetry;
    }

    public void handle(HttpServerRequest httpServerRequest) {
        URI create = URI.create(httpServerRequest.uri());
        if (DISCOVER_PATH.equalsIgnoreCase(create.getPath())) {
            handleDiscoveryRequest(httpServerRequest);
            return;
        }
        String[] split = SLASH.split(create.getPath());
        if (split.length < 3) {
            LOG.warn("Path doesn't match the pattern /invoke/ServiceName/HandlerName nor /discover: '{}'", httpServerRequest.path());
            httpServerRequest.response().setStatusCode(HttpResponseStatus.NOT_FOUND.code()).end();
            return;
        }
        String str = split[split.length - 2];
        String str2 = split[split.length - 1];
        Context extract = this.openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), httpServerRequest.headers(), OTEL_TEXT_MAP_GETTER);
        io.vertx.core.Context context = ((HttpServerRequestInternal) httpServerRequest).context();
        try {
            RestateEndpoint restateEndpoint = this.restateEndpoint;
            String header = httpServerRequest.getHeader(HttpHeaderNames.CONTENT_TYPE);
            Objects.requireNonNull(httpServerRequest);
            Flow.Subscriber<? super ByteBuffer> resolve = restateEndpoint.resolve(header, str, str2, httpServerRequest::getHeader, extract, ContextualData::put, currentContextExecutor(context));
            LOG.debug("Handling request to " + str + "/" + str2);
            HttpServerResponse response = httpServerRequest.response();
            response.setStatusCode(HttpResponseStatus.OK.code());
            response.putHeader(HttpHeaderNames.CONTENT_TYPE, resolve.responseContentType()).putHeader(X_RESTATE_SERVER_KEY, X_RESTATE_SERVER_VALUE);
            response.setChunked(true);
            HttpRequestFlowAdapter httpRequestFlowAdapter = new HttpRequestFlowAdapter(httpServerRequest);
            InvocationFlow.InvocationOutputSubscriber httpResponseFlowAdapter = new HttpResponseFlowAdapter(response);
            httpRequestFlowAdapter.subscribe(resolve);
            resolve.subscribe(httpResponseFlowAdapter);
        } catch (ProtocolException e) {
            LOG.warn("Error when handling the request", e);
            httpServerRequest.response().setStatusCode(e.getCode()).putHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain").putHeader(X_RESTATE_SERVER_KEY, X_RESTATE_SERVER_VALUE).end(e.getMessage());
        }
    }

    private Executor currentContextExecutor(io.vertx.core.Context context) {
        return runnable -> {
            context.runOnContext(r3 -> {
                runnable.run();
            });
        };
    }

    private void handleDiscoveryRequest(HttpServerRequest httpServerRequest) {
        try {
            RestateEndpoint.DiscoveryResponse handleDiscoveryRequest = this.restateEndpoint.handleDiscoveryRequest(httpServerRequest.getHeader(HttpHeaderNames.ACCEPT));
            httpServerRequest.response().setStatusCode(HttpResponseStatus.OK.code()).putHeader(X_RESTATE_SERVER_KEY, X_RESTATE_SERVER_VALUE).putHeader(HttpHeaderNames.CONTENT_TYPE, handleDiscoveryRequest.getContentType()).end(Buffer.buffer(handleDiscoveryRequest.getSerializedManifest()));
        } catch (ProtocolException e) {
            LOG.warn("Error when handling the discovery request", e);
            httpServerRequest.response().setStatusCode(e.getCode()).putHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain").putHeader(X_RESTATE_SERVER_KEY, X_RESTATE_SERVER_VALUE).end(e.getMessage());
        }
    }
}
