package io.axoniq.eventstore.client.util;

import io.axoniq.ext.io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/eventstore/client/util/FlowControllingStreamObserver.class */
public class FlowControllingStreamObserver<Request, Response> {
    private final StartCommand<Request, Response> startCommand;
    private final OnNext<Request, Response> onNext;
    private final OnError onError;
    private Logger logger = LoggerFactory.getLogger(FlowControllingStreamObserver.class);
    private StreamObserver<Request> requestStream;
    private Request nextRequest;
    private AtomicInteger permitsLeft;
    private int next;
    private int threshold;
    private boolean flowControl;
    private boolean stopped;

    /* loaded from: input_file:io/axoniq/eventstore/client/util/FlowControllingStreamObserver$FlowControlledResponseStream.class */
    private class FlowControlledResponseStream implements StreamObserver<Response> {
        private FlowControlledResponseStream() {
        }

        @Override // io.axoniq.ext.io.grpc.stub.StreamObserver
        public void onNext(Response response) {
            FlowControllingStreamObserver.this.onNext.next(response, FlowControllingStreamObserver.this.requestStream);
        }

        @Override // io.axoniq.ext.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            FlowControllingStreamObserver.this.logger.error("Received error: {}", th.getMessage());
            FlowControllingStreamObserver.this.handleError(th);
        }

        @Override // io.axoniq.ext.io.grpc.stub.StreamObserver
        public void onCompleted() {
            FlowControllingStreamObserver.this.logger.debug("OnCompleted");
        }
    }

    /* loaded from: input_file:io/axoniq/eventstore/client/util/FlowControllingStreamObserver$OnError.class */
    public interface OnError {
        void error(Throwable th);
    }

    /* loaded from: input_file:io/axoniq/eventstore/client/util/FlowControllingStreamObserver$OnNext.class */
    public interface OnNext<Request, Response> {
        void next(Response response, StreamObserver<Request> streamObserver);
    }

    /* loaded from: input_file:io/axoniq/eventstore/client/util/FlowControllingStreamObserver$StartCommand.class */
    public interface StartCommand<Request, Response> {
        StreamObserver<Request> call(StreamObserver<Response> streamObserver);
    }

    public FlowControllingStreamObserver(StartCommand<Request, Response> startCommand, OnNext<Request, Response> onNext, OnError onError) {
        this.startCommand = startCommand;
        this.onNext = onNext;
        this.onError = onError;
    }

    public void stop() {
        this.logger.info("Observer stopped");
        this.stopped = true;
        try {
            this.requestStream.onCompleted();
        } catch (Exception e) {
        }
    }

    public void markConsumed(int i) {
        if (!this.flowControl || this.permitsLeft.addAndGet(-i) > this.threshold) {
            return;
        }
        this.requestStream.onNext(this.nextRequest);
        this.permitsLeft.addAndGet(this.next);
    }

    public void start(Request request, Request request2, int i, int i2, int i3) {
        this.nextRequest = request2;
        this.permitsLeft = new AtomicInteger(i);
        this.next = i2;
        this.threshold = i3;
        this.flowControl = request2 != null && i > 0;
        this.requestStream = this.startCommand.call(new FlowControlledResponseStream());
        this.requestStream.onNext(request);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Throwable th) {
        if (this.stopped) {
            return;
        }
        this.onError.error(th);
    }
}
