package dev.restate.sdk.http.vertx;

import dev.restate.common.Slice;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Flow;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/restate/sdk/http/vertx/HttpRequestFlowAdapter.class */
public class HttpRequestFlowAdapter implements Flow.Publisher<Slice> {
    private static final Logger LOG = LogManager.getLogger(HttpRequestFlowAdapter.class);
    private final HttpServerRequest httpServerRequest;
    private Flow.Subscriber<? super Slice> inputMessagesSubscriber;
    private long subscriberRequest = 0;
    private final Queue<ByteBuffer> buffers = new ArrayDeque();

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpRequestFlowAdapter(HttpServerRequest httpServerRequest) {
        this.httpServerRequest = httpServerRequest;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super Slice> subscriber) {
        this.inputMessagesSubscriber = subscriber;
        this.inputMessagesSubscriber.onSubscribe(new Flow.Subscription() { // from class: dev.restate.sdk.http.vertx.HttpRequestFlowAdapter.1
            @Override // java.util.concurrent.Flow.Subscription
            public void request(long j) {
                HttpRequestFlowAdapter.this.handleSubscriptionRequest(j);
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void cancel() {
                HttpRequestFlowAdapter.this.closeRequest();
            }
        });
        this.httpServerRequest.handler(this::handleIncomingBuffer);
        this.httpServerRequest.exceptionHandler(this::handleRequestFailure);
        this.httpServerRequest.endHandler(this::handleRequestEnd);
    }

    private void closeRequest() {
        if (this.httpServerRequest.isEnded()) {
            return;
        }
        this.httpServerRequest.end();
    }

    private void handleSubscriptionRequest(long j) {
        if (j == Long.MAX_VALUE) {
            this.subscriberRequest = j;
        } else {
            this.subscriberRequest += j;
            if (this.subscriberRequest < 0) {
                this.subscriberRequest = Long.MAX_VALUE;
            }
        }
        tryProgress();
    }

    private void handleIncomingBuffer(Buffer buffer) {
        if (!this.buffers.isEmpty() || this.subscriberRequest <= 0) {
            this.buffers.add(buffer.getByteBuf().nioBuffer());
            tryProgress();
        } else {
            this.inputMessagesSubscriber.onNext(Slice.wrap(buffer.getByteBuf().nioBuffer()));
            this.subscriberRequest--;
        }
    }

    private void handleRequestFailure(Throwable th) {
        LOG.trace("Request error", th);
        this.inputMessagesSubscriber.onError(th);
    }

    private void handleRequestEnd(Void r4) {
        LOG.trace("Request end");
        this.inputMessagesSubscriber.onComplete();
        this.inputMessagesSubscriber = null;
    }

    private void tryProgress() {
        ByteBuffer poll;
        while (this.subscriberRequest > 0 && (poll = this.buffers.poll()) != null) {
            this.subscriberRequest--;
            this.inputMessagesSubscriber.onNext(Slice.wrap(poll));
        }
    }
}
