package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Promise;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.kernel.impl.util.CappedLogger;
import org.neo4j.logging.Log;

/* loaded from: input_file:org/neo4j/causalclustering/messaging/ReconnectingChannel.class */
public class ReconnectingChannel implements Channel {
    public static final AttributeKey<ProtocolStack> PROTOCOL_STACK_KEY = AttributeKey.valueOf("PROTOCOL_STACK");
    private final Log log;
    private final Bootstrap bootstrap;
    private final EventLoop eventLoop;
    private final SocketAddress destination;
    private final TimeoutStrategy connectionBackoffStrategy;
    private volatile io.netty.channel.Channel channel;
    private volatile ChannelFuture fChannel;
    private volatile boolean disposed;
    private TimeoutStrategy.Timeout connectionBackoff;
    private CappedLogger cappedLogger;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReconnectingChannel(Bootstrap bootstrap, EventLoop eventLoop, SocketAddress socketAddress, Log log) {
        this(bootstrap, eventLoop, socketAddress, log, new ExponentialBackoffStrategy(100L, 1600L, TimeUnit.MILLISECONDS));
    }

    private ReconnectingChannel(Bootstrap bootstrap, EventLoop eventLoop, SocketAddress socketAddress, Log log, TimeoutStrategy timeoutStrategy) {
        this.bootstrap = bootstrap;
        this.eventLoop = eventLoop;
        this.destination = socketAddress;
        this.log = log;
        this.cappedLogger = new CappedLogger(log).setTimeLimit(20L, TimeUnit.SECONDS, Clock.systemUTC());
        this.connectionBackoffStrategy = timeoutStrategy;
        this.connectionBackoff = timeoutStrategy.newTimeout();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        tryConnect();
    }

    private synchronized void tryConnect() {
        if (this.disposed) {
            return;
        }
        if (this.fChannel == null || this.fChannel.isDone()) {
            this.fChannel = this.bootstrap.connect(this.destination.socketAddress());
            this.channel = this.fChannel.channel();
            this.fChannel.addListener(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    this.log.info("Connected: " + channelFuture.channel());
                    channelFuture.channel().closeFuture().addListener(future -> {
                        this.log.warn(String.format("Lost connection to: %s (%s)", this.destination, this.channel.remoteAddress()));
                        this.connectionBackoff = this.connectionBackoffStrategy.newTimeout();
                        channelFuture.channel().eventLoop().schedule(this::tryConnect, 0L, TimeUnit.MILLISECONDS);
                    });
                } else {
                    long millis = this.connectionBackoff.getMillis();
                    this.cappedLogger.warn("Failed to connect to: " + this.destination.socketAddress() + ". Retrying in " + millis + " ms");
                    channelFuture.channel().eventLoop().schedule(this::tryConnect, millis, TimeUnit.MILLISECONDS);
                    this.connectionBackoff.increment();
                }
            });
        }
    }

    @Override // org.neo4j.causalclustering.messaging.Channel
    public synchronized void dispose() {
        this.disposed = true;
        this.channel.close();
    }

    @Override // org.neo4j.causalclustering.messaging.Channel
    public boolean isDisposed() {
        return this.disposed;
    }

    @Override // org.neo4j.causalclustering.messaging.Channel
    public boolean isOpen() {
        return this.channel.isOpen();
    }

    @Override // org.neo4j.causalclustering.messaging.Channel
    public Future<Void> write(Object obj) {
        return write(obj, false);
    }

    @Override // org.neo4j.causalclustering.messaging.Channel
    public Future<Void> writeAndFlush(Object obj) {
        return write(obj, true);
    }

    private Future<Void> write(Object obj, boolean z) {
        if (this.disposed) {
            throw new IllegalStateException("sending on disposed channel");
        }
        if (this.channel.isActive()) {
            return z ? this.channel.writeAndFlush(obj) : this.channel.write(obj);
        }
        Promise<Void> newPromise = this.eventLoop.newPromise();
        deferredWrite(obj, this.fChannel, newPromise, true, z ? (channel, obj2) -> {
            chain(channel.writeAndFlush(obj), newPromise);
        } : (channel2, obj3) -> {
            chain(channel2.write(obj), newPromise);
        });
        return newPromise;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void chain(ChannelFuture channelFuture, Promise<Void> promise) {
        channelFuture.addListener(future -> {
            if (future.isSuccess()) {
                promise.setSuccess(channelFuture.get());
            } else {
                promise.setFailure(channelFuture.cause());
            }
        });
    }

    private void deferredWrite(Object obj, ChannelFuture channelFuture, Promise<Void> promise, boolean z, BiConsumer<io.netty.channel.Channel, Object> biConsumer) {
        channelFuture.addListener(channelFuture2 -> {
            if (channelFuture2.isSuccess()) {
                biConsumer.accept(channelFuture2.channel(), obj);
            } else if (!z) {
                promise.setFailure(channelFuture2.cause());
            } else {
                tryConnect();
                deferredWrite(obj, this.fChannel, promise, false, biConsumer);
            }
        });
    }

    public Optional<ProtocolStack> installedProtocolStack() {
        return Optional.ofNullable(this.channel.attr(PROTOCOL_STACK_KEY).get());
    }

    public String toString() {
        return "ReconnectingChannel{channel=" + this.channel + ", disposed=" + this.disposed + '}';
    }
}
