package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.logging.Log;

/* loaded from: input_file:org/neo4j/causalclustering/messaging/NonBlockingChannel.class */
class NonBlockingChannel {
    private static final int CONNECT_BACKOFF_MS = 250;
    private final Log log;
    private final Bootstrap bootstrap;
    private final EventLoop eventLoop;
    private final SocketAddress destination;
    private volatile Channel channel;
    private volatile ChannelFuture fChannel;
    private volatile boolean disposed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NonBlockingChannel(Bootstrap bootstrap, EventLoop eventLoop, SocketAddress socketAddress, Log log) {
        this.bootstrap = bootstrap;
        this.eventLoop = eventLoop;
        this.destination = socketAddress;
        this.log = log;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        tryConnect();
    }

    private synchronized void tryConnect() {
        if (this.disposed) {
            return;
        }
        if (this.fChannel == null || this.fChannel.isDone()) {
            this.fChannel = this.bootstrap.connect(this.destination.socketAddress());
            this.channel = this.fChannel.channel();
            this.fChannel.addListener(channelFuture -> {
                if (!channelFuture.isSuccess()) {
                    channelFuture.channel().eventLoop().schedule(this::tryConnect, 250L, TimeUnit.MILLISECONDS);
                } else {
                    this.log.info("Connected: " + channelFuture.channel());
                    channelFuture.channel().closeFuture().addListener(future -> {
                        this.log.warn(String.format("Lost connection to: %s (%s)", this.destination, this.channel.remoteAddress()));
                        channelFuture.channel().eventLoop().schedule(this::tryConnect, 250L, TimeUnit.MILLISECONDS);
                    });
                }
            });
        }
    }

    public synchronized void dispose() {
        this.disposed = true;
        this.channel.close();
    }

    public Future<Void> send(Object obj) {
        if (this.disposed) {
            throw new IllegalStateException("sending on disposed channel");
        }
        if (this.channel.isActive()) {
            return this.channel.writeAndFlush(obj);
        }
        Promise<?> newPromise = this.eventLoop.newPromise();
        deferredWrite(obj, this.fChannel, newPromise, true);
        return newPromise;
    }

    private void deferredWrite(Object obj, ChannelFuture channelFuture, Promise<?> promise, boolean z) {
        channelFuture.addListener(channelFuture2 -> {
            if (channelFuture2.isSuccess()) {
                channelFuture2.channel().writeAndFlush(obj).addListener(future -> {
                    promise.setSuccess((Object) null);
                });
            } else if (!z) {
                promise.setFailure(channelFuture2.cause());
            } else {
                tryConnect();
                deferredWrite(obj, this.fChannel, promise, false);
            }
        });
    }
}
