package io.axoniq.flowcontrol.consumer.grpc.client;

import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/ClientResponseObserverSubscription.class */
public class ClientResponseObserverSubscription<Message> implements ClientResponseObserver<Object, Message>, Subscription {
    private static final Logger logger = LoggerFactory.getLogger(ClientResponseObserverSubscription.class);
    private final Subscriber<? super Message> subscriber;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final AtomicBoolean subscribed = new AtomicBoolean(false);
    private final AtomicReference<ClientCallStreamObserver<Object>> outgoingStream = new AtomicReference<>();
    private final Queue<Signal> queue = new ConcurrentLinkedQueue();
    private final AtomicLong requested = new AtomicLong(0);
    private final AtomicBoolean signalGate = new AtomicBoolean(false);

    /* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/ClientResponseObserverSubscription$Complete.class */
    private class Complete implements Signal {
        private Complete() {
        }

        @Override // io.axoniq.flowcontrol.consumer.grpc.client.ClientResponseObserverSubscription.Signal
        public boolean consumeRequest() {
            return false;
        }

        @Override // io.axoniq.flowcontrol.consumer.grpc.client.ClientResponseObserverSubscription.Signal
        public void run() {
            ClientResponseObserverSubscription.this.subscriber.onComplete();
        }
    }

    /* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/ClientResponseObserverSubscription$Error.class */
    private class Error implements Signal {
        private final Throwable exception;

        private Error(Throwable th) {
            this.exception = th;
        }

        @Override // io.axoniq.flowcontrol.consumer.grpc.client.ClientResponseObserverSubscription.Signal
        public boolean consumeRequest() {
            return false;
        }

        @Override // io.axoniq.flowcontrol.consumer.grpc.client.ClientResponseObserverSubscription.Signal
        public void run() {
            ClientResponseObserverSubscription.this.subscriber.onError(this.exception);
        }
    }

    /* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/ClientResponseObserverSubscription$Next.class */
    private class Next implements Signal {
        private final Message message;

        private Next(Message message) {
            this.message = message;
        }

        @Override // io.axoniq.flowcontrol.consumer.grpc.client.ClientResponseObserverSubscription.Signal
        public boolean consumeRequest() {
            return true;
        }

        @Override // io.axoniq.flowcontrol.consumer.grpc.client.ClientResponseObserverSubscription.Signal
        public void run() {
            ClientResponseObserverSubscription.this.subscriber.onNext(this.message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/ClientResponseObserverSubscription$Signal.class */
    public interface Signal {
        boolean consumeRequest();

        void run();
    }

    public ClientResponseObserverSubscription(Subscriber<? super Message> subscriber) {
        this.subscriber = subscriber;
    }

    public void beforeStart(ClientCallStreamObserver<Object> clientCallStreamObserver) {
        logger.debug("invoking before start on call {}", Integer.valueOf(System.identityHashCode(this)));
        if (this.cancelled.get()) {
            logger.debug("Call {} has been cancelled before start.", Integer.valueOf(System.identityHashCode(this)));
            return;
        }
        try {
            this.outgoingStream.set(clientCallStreamObserver);
            logger.debug("Disabling the automatic gRPC request for permits for call {}.", Integer.valueOf(System.identityHashCode(this)));
            clientCallStreamObserver.disableAutoRequestWithInitial(0);
            logger.debug("gRPC manual request mode enabled for call {}.", Integer.valueOf(System.identityHashCode(this)));
        } catch (Exception e) {
            logger.warn("error during before start: ", e);
            submit(new Error(e));
        }
    }

    public void onNext(Message message) {
        logger.trace("Message received for call {}.", Integer.valueOf(System.identityHashCode(this)));
        submit(new Next(message));
    }

    public void onError(Throwable th) {
        logger.debug("Call completed exceptionally. ", th);
        submit(new Error(th));
    }

    public void onCompleted() {
        logger.debug("Call {} completed.", Integer.valueOf(System.identityHashCode(this)));
        submit(new Complete());
    }

    public void request(long j) {
        if (j <= 0) {
            this.subscriber.onError(new IllegalArgumentException("negative subscription request"));
            return;
        }
        this.outgoingStream.get().request(j > 2147483647L ? Integer.MAX_VALUE : (int) j);
        this.requested.updateAndGet(j2 -> {
            return j2 + Math.min(Long.MAX_VALUE - j2, j);
        });
        signal();
    }

    public void cancel() {
        logger.debug("The call has been cancelled.");
        this.cancelled.set(true);
        ClientCallStreamObserver<Object> clientCallStreamObserver = this.outgoingStream.get();
        if (clientCallStreamObserver != null) {
            clientCallStreamObserver.onCompleted();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void afterSubscribe() {
        this.subscribed.set(true);
        signal();
    }

    private void submit(Signal signal) {
        this.queue.add(signal);
        signal();
    }

    private void signal() {
        if (canSignal() && this.signalGate.compareAndSet(false, true)) {
            drain();
        }
    }

    private void drain() {
        while (canSignal()) {
            Signal poll = this.queue.poll();
            if (poll != null) {
                poll.run();
                if (poll.consumeRequest()) {
                    this.requested.decrementAndGet();
                }
            }
        }
        this.signalGate.set(false);
        signal();
    }

    private boolean canSignal() {
        Signal peek = this.queue.peek();
        return this.subscribed.get() && !this.cancelled.get() && peek != null && (!peek.consumeRequest() || this.requested.get() > 0);
    }
}
