package dev.restate.sdk.http.vertx;

import dev.restate.common.Slice;
import dev.restate.sdk.core.ExceptionUtils;
import io.netty.buffer.Unpooled;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import java.util.concurrent.Flow;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/restate/sdk/http/vertx/HttpResponseFlowAdapter.class */
public class HttpResponseFlowAdapter implements Flow.Subscriber<Slice> {
    private static final Logger LOG = LogManager.getLogger(HttpResponseFlowAdapter.class);
    private final HttpServerResponse httpServerResponse;
    private Flow.Subscription outputSubscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpResponseFlowAdapter(HttpServerResponse httpServerResponse) {
        this.httpServerResponse = httpServerResponse;
        this.httpServerResponse.exceptionHandler(this::propagateWireFailure);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.outputSubscription = subscription;
        this.outputSubscription.request(Long.MAX_VALUE);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(Slice slice) {
        if (this.httpServerResponse.ended()) {
            cancelSubscription();
        } else {
            this.httpServerResponse.write(Buffer.buffer(Unpooled.wrappedBuffer(slice.asReadOnlyByteBuffer())));
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        propagatePublisherFailure(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        endResponse();
    }

    private void propagateWireFailure(Throwable th) {
        LOG.warn("Error from wire", th);
        endResponse();
    }

    private void propagatePublisherFailure(Throwable th) {
        if (!this.httpServerResponse.headWritten()) {
            ExceptionUtils.findProtocolException(th).ifPresentOrElse(protocolException -> {
                this.httpServerResponse.setStatusCode(protocolException.getCode());
            }, () -> {
                this.httpServerResponse.setStatusCode(500);
            });
        }
        LOG.warn("Error from publisher", th);
        endResponse();
    }

    private void endResponse() {
        LOG.trace("Closing response");
        if (!this.httpServerResponse.ended()) {
            this.httpServerResponse.end();
        }
        cancelSubscription();
    }

    private void cancelSubscription() {
        if (this.outputSubscription != null) {
            LOG.trace("Cancelling subscription");
            Flow.Subscription subscription = this.outputSubscription;
            this.outputSubscription = null;
            subscription.cancel();
        }
    }
}
