package io.reactivesocket.local.internal;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject;
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import java.nio.channels.ClosedChannelException;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reactivesocket/local/internal/PeerConnector.class */
public class PeerConnector {
    private static final Logger logger = LoggerFactory.getLogger(PeerConnector.class);
    private final String name;
    private final EmptySubject closeNotifier = new EmptySubject();
    private final LocalDuplexConnection server = new LocalDuplexConnection(this.closeNotifier, false);
    private final LocalDuplexConnection client = new LocalDuplexConnection(this.closeNotifier, true);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reactivesocket/local/internal/PeerConnector$LocalDuplexConnection.class */
    public final class LocalDuplexConnection implements DuplexConnection {
        private volatile ValidatingSubscription<Frame> receiver;
        private volatile boolean connected;
        private final EmptySubject closeNotifier;
        private final boolean client;
        private volatile LocalDuplexConnection peer;

        private LocalDuplexConnection(EmptySubject emptySubject, boolean z) {
            this.closeNotifier = emptySubject;
            this.client = z;
            emptySubject.subscribe(Subscribers.doOnTerminate(() -> {
                this.connected = false;
                if (this.receiver != null) {
                    this.receiver.safeOnError(new ClosedChannelException());
                }
            }));
        }

        public Publisher<Void> send(Publisher<Frame> publisher) {
            return subscriber -> {
                Consumer consumer = subscription -> {
                    subscription.request(Long.MAX_VALUE);
                };
                Consumer consumer2 = frame -> {
                    if (this.peer != null) {
                        this.peer.receiveFrameFromPeer(frame);
                    } else {
                        PeerConnector.logger.warn("Sending a frame but peer not connected. Ignoring frame: " + frame);
                    }
                };
                subscriber.getClass();
                Consumer consumer3 = subscriber::onError;
                subscriber.getClass();
                CancellableSubscriber create = Subscribers.create(consumer, consumer2, consumer3, subscriber::onComplete, (Runnable) null);
                subscriber.onSubscribe(ValidatingSubscription.onCancel(subscriber, () -> {
                    create.cancel();
                }));
                publisher.subscribe(create);
            };
        }

        public Publisher<Frame> receive() {
            return subscriber -> {
                boolean z = false;
                synchronized (this) {
                    if (this.receiver == null || !this.receiver.isActive()) {
                        this.receiver = ValidatingSubscription.empty(subscriber);
                    } else {
                        z = true;
                    }
                }
                if (!z) {
                    subscriber.onSubscribe(this.receiver);
                } else {
                    subscriber.onSubscribe(ValidatingSubscription.empty(subscriber));
                    subscriber.onError(new IllegalStateException("Only one active subscription allowed."));
                }
            };
        }

        public double availability() {
            return this.connected ? 1.0d : 0.0d;
        }

        public Publisher<Void> close() {
            return Px.defer(() -> {
                this.closeNotifier.onComplete();
                return this.closeNotifier;
            });
        }

        public Publisher<Void> onClose() {
            return this.closeNotifier;
        }

        public String toString() {
            return "[local connection(" + (this.client ? "client" : "server) - ") + PeerConnector.this.name + "] connected: " + this.connected;
        }

        public void receiveFrameFromPeer(Frame frame) {
            if (this.receiver != null) {
                this.receiver.safeOnNext(frame);
            } else {
                PeerConnector.logger.warn("Received a frame but peer not connected. Ignoring frame: " + frame);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void connect(LocalDuplexConnection localDuplexConnection) {
            this.peer = localDuplexConnection;
            this.connected = true;
        }
    }

    private PeerConnector(String str) {
        this.name = str;
        this.server.connect(this.client);
        this.client.connect(this.server);
    }

    public DuplexConnection forClient() {
        return this.client;
    }

    public DuplexConnection forServer() {
        return this.server;
    }

    public void shutdown() {
        this.closeNotifier.onComplete();
    }

    public static PeerConnector connect(String str, int i) {
        return new PeerConnector(str + '-' + i);
    }
}
