package io.axoniq.flowcontrol.producer.grpc;

import io.axoniq.flowcontrol.OutgoingStream;
import io.axoniq.flowcontrol.producer.grpc.ActiveSubscriptions;
import io.axoniq.flowcontrol.producer.grpc.shutdown.ProducerCompletedShutDownPolicy;
import io.axoniq.flowcontrol.producer.grpc.subscriptions.RoundRobinSubscriptions;
import io.grpc.stub.CallStreamObserver;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/flowcontrol/producer/grpc/FlowControlledOutgoingStream.class */
public class FlowControlledOutgoingStream<Message> implements OutgoingStream<Message> {
    private final ShutdownPolicy<Message> shutdownPolicy;
    private final ActiveSubscriptions activeSubscriptions;
    private final CallStreamObserver<Message> outgoingStream;
    private final AtomicBoolean flowGate;
    private final Logger logger;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/axoniq/flowcontrol/producer/grpc/FlowControlledOutgoingStream$FlowControlledSubscriber.class */
    public class FlowControlledSubscriber implements Subscriber<Message>, ActiveSubscriptions.ActiveSubscription {
        private final AtomicReference<Subscription> subscription = new AtomicReference<>();
        private volatile boolean pending = false;

        protected FlowControlledSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            if (subscription == null) {
                throw new NullPointerException("Subscription must be null!");
            }
            if (!this.subscription.compareAndSet(null, subscription)) {
                subscription.cancel();
            } else {
                FlowControlledOutgoingStream.this.activeSubscriptions.add(this);
                FlowControlledOutgoingStream.this.flow();
            }
        }

        public void onNext(Message message) {
            if (message == null) {
                throw new NullPointerException();
            }
            this.pending = false;
            FlowControlledOutgoingStream.this.outgoingStream.onNext(message);
            FlowControlledOutgoingStream.this.shutdownPolicy.onMessageConsumed(message);
            if (FlowControlledOutgoingStream.this.outgoingStream.isReady()) {
                FlowControlledOutgoingStream.this.request();
                return;
            }
            FlowControlledOutgoingStream.this.flowGate.set(false);
            FlowControlledOutgoingStream.this.logger.debug("Flow control gate closed. Cause: stream is not ready.");
            FlowControlledOutgoingStream.this.flow();
        }

        public void onComplete() {
            FlowControlledOutgoingStream.this.shutdownPolicy.onPublisherComplete();
            dispose();
        }

        public void onError(Throwable th) {
            if (th == null) {
                throw new NullPointerException();
            }
            FlowControlledOutgoingStream.this.shutdownPolicy.onPublisherError(th);
            dispose();
        }

        @Override // io.axoniq.flowcontrol.producer.grpc.ActiveSubscriptions.ActiveSubscription
        public void request() {
            this.pending = true;
            this.subscription.get().request(1L);
        }

        private void dispose() {
            FlowControlledOutgoingStream.this.logger.debug("Disposing subscription.");
            FlowControlledOutgoingStream.this.activeSubscriptions.remove(this);
            if (this.pending) {
                FlowControlledOutgoingStream.this.request();
            }
        }
    }

    public FlowControlledOutgoingStream(CallStreamObserver<Message> callStreamObserver, Executor executor) {
        this(callStreamObserver, new ProducerCompletedShutDownPolicy(), new RoundRobinSubscriptions(), executor);
    }

    public FlowControlledOutgoingStream(CallStreamObserver<Message> callStreamObserver, ShutdownPolicy<Message> shutdownPolicy, ActiveSubscriptions activeSubscriptions, Executor executor) {
        this.flowGate = new AtomicBoolean();
        this.logger = LoggerFactory.getLogger(FlowControlledOutgoingStream.class);
        this.outgoingStream = callStreamObserver;
        this.outgoingStream.setOnReadyHandler(() -> {
            executor.execute(this::flow);
        });
        this.shutdownPolicy = shutdownPolicy;
        this.shutdownPolicy.initialize(this::shutdown, this::shutdown);
        this.activeSubscriptions = activeSubscriptions;
    }

    @Override // io.axoniq.flowcontrol.OutgoingStream, java.util.function.Consumer
    public void accept(Publisher<Message> publisher) {
        publisher.subscribe(new FlowControlledSubscriber());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flow() {
        if (!this.activeSubscriptions.hasNext() || !this.outgoingStream.isReady()) {
            this.logger.debug("Flow control gate still closed.");
        } else if (!this.flowGate.compareAndSet(false, true)) {
            this.logger.debug("Flow control gate already open.");
        } else {
            request();
            this.logger.debug("Flow control gate opened.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void request() {
        boolean hasNext = this.activeSubscriptions.hasNext();
        if (hasNext && this.outgoingStream.isReady()) {
            Optional<ActiveSubscriptions.ActiveSubscription> next = this.activeSubscriptions.next();
            if (next.isPresent()) {
                next.get().request();
                return;
            }
        }
        this.flowGate.set(false);
        this.logger.debug("Flow control gate closed. Cause: {}.", hasNext ? "stream is not ready" : "no message available yet");
        flow();
    }

    private void shutdown() {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Shutdown call: {}", Arrays.stream(Thread.currentThread().getStackTrace()).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining("\n")));
        }
        this.shutdownPolicy.shutdown();
        this.outgoingStream.onCompleted();
    }

    private void shutdown(Throwable th) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Shutdown call exceptionally: " + ((String) Arrays.stream(Thread.currentThread().getStackTrace()).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining("\n"))), th);
        }
        this.shutdownPolicy.shutdown();
        this.outgoingStream.onError(th);
    }
}
