package io.axoniq.flowcontrol.consumer.grpc.client;

import io.axoniq.flowcontrol.DuplexStreamException;
import io.axoniq.flowcontrol.IncomingStream;
import io.axoniq.flowcontrol.OutgoingStream;
import io.axoniq.flowcontrol.producer.grpc.FlowControlledOutgoingStream;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/FlowControlledDuplexStream.class */
public class FlowControlledDuplexStream<IncomingMessage, OutgoingMessage> implements IncomingStream<IncomingMessage> {
    private final BidirectionalCall<IncomingMessage, OutgoingMessage> call;
    private final Consumer<OutgoingStream<OutgoingMessage>> consumer;
    private final Executor executor;

    /* loaded from: input_file:io/axoniq/flowcontrol/consumer/grpc/client/FlowControlledDuplexStream$BidirectionalCall.class */
    public interface BidirectionalCall<Res, Req> {
        StreamObserver<Req> invoke(StreamObserver<Res> streamObserver);
    }

    public FlowControlledDuplexStream(BidirectionalCall<IncomingMessage, OutgoingMessage> bidirectionalCall, Executor executor) {
        this(bidirectionalCall, outgoingStream -> {
        }, executor);
    }

    public FlowControlledDuplexStream(BidirectionalCall<IncomingMessage, OutgoingMessage> bidirectionalCall, Consumer<OutgoingStream<OutgoingMessage>> consumer, Executor executor) {
        this.call = bidirectionalCall;
        this.consumer = consumer;
        this.executor = executor;
    }

    public void subscribe(@Nonnull Subscriber<? super IncomingMessage> subscriber) {
        try {
            duplexSubscribe(subscriber);
        } catch (DuplexStreamException e) {
        }
    }

    public OutgoingStream<OutgoingMessage> duplexSubscribe(@Nonnull Subscriber<? super IncomingMessage> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("subscriber must not be null");
        }
        StreamObserver<IncomingMessage> clientResponseObserverSubscription = new ClientResponseObserverSubscription<>(subscriber);
        Exception exc = null;
        FlowControlledOutgoingStream flowControlledOutgoingStream = null;
        try {
            flowControlledOutgoingStream = new FlowControlledOutgoingStream(this.call.invoke(clientResponseObserverSubscription), this.executor);
            this.consumer.accept(flowControlledOutgoingStream);
        } catch (Exception e) {
            exc = e;
        }
        try {
            subscriber.onSubscribe(clientResponseObserverSubscription);
            if (exc == null) {
                return flowControlledOutgoingStream;
            }
            subscriber.onError(exc);
            clientResponseObserverSubscription.afterSubscribe();
            throw new DuplexStreamException(exc);
        } finally {
            clientResponseObserverSubscription.afterSubscribe();
        }
    }
}
