package dev.restate.sdk.http.vertx;

import dev.restate.common.Slice;
import dev.restate.sdk.core.EndpointRequestHandler;
import dev.restate.sdk.core.ProtocolException;
import dev.restate.sdk.endpoint.Endpoint;
import dev.restate.sdk.endpoint.HeadersAccessor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.util.AsciiString;
import io.reactiverse.contextual.logging.ContextualData;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dev/restate/sdk/http/vertx/HttpEndpointRequestHandler.class */
public class HttpEndpointRequestHandler implements Handler<HttpServerRequest> {
    private static final Logger LOG = LogManager.getLogger(HttpEndpointRequestHandler.class);
    private static final AsciiString X_RESTATE_SERVER_KEY = AsciiString.cached("x-restate-server");
    private static final AsciiString X_RESTATE_SERVER_VALUE = AsciiString.cached("restate-sdk-java/2.0.0_e254573d");
    private final EndpointRequestHandler endpoint;

    private HttpEndpointRequestHandler(Endpoint endpoint) {
        this.endpoint = EndpointRequestHandler.forBidiStream(endpoint);
    }

    public void handle(final HttpServerRequest httpServerRequest) {
        try {
            Flow.Subscriber<? super Slice> processorForRequest = this.endpoint.processorForRequest(URI.create(httpServerRequest.uri()).getPath(), new HeadersAccessor() { // from class: dev.restate.sdk.http.vertx.HttpEndpointRequestHandler.1
                public Iterable<String> keys() {
                    return httpServerRequest.headers().names();
                }

                public String get(String str) {
                    return httpServerRequest.getHeader(str);
                }
            }, ContextualData::put, currentContextExecutor(((HttpServerRequestInternal) httpServerRequest).context()));
            HttpServerResponse response = httpServerRequest.response();
            response.setStatusCode(processorForRequest.statusCode());
            response.putHeader(HttpHeaderNames.CONTENT_TYPE, processorForRequest.responseContentType()).putHeader(X_RESTATE_SERVER_KEY, X_RESTATE_SERVER_VALUE);
            response.setChunked(true);
            HttpRequestFlowAdapter httpRequestFlowAdapter = new HttpRequestFlowAdapter(httpServerRequest);
            HttpResponseFlowAdapter httpResponseFlowAdapter = new HttpResponseFlowAdapter(response);
            httpRequestFlowAdapter.subscribe(processorForRequest);
            processorForRequest.subscribe(httpResponseFlowAdapter);
        } catch (ProtocolException e) {
            LOG.warn("Error when handling the request", e);
            httpServerRequest.response().setStatusCode(e.getCode()).putHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain").putHeader(X_RESTATE_SERVER_KEY, X_RESTATE_SERVER_VALUE).end(e.getMessage());
        }
    }

    private Executor currentContextExecutor(Context context) {
        return runnable -> {
            context.runOnContext(r3 -> {
                runnable.run();
            });
        };
    }

    public static HttpEndpointRequestHandler fromEndpoint(Endpoint endpoint) {
        return new HttpEndpointRequestHandler(endpoint);
    }
}
