package org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSocket;
import org.apache.zookeeper.common.NetUtils;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.util.CircularBlockingQueue;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager.class */
public class QuorumCnxManager {
    static final int RECV_CAPACITY = 100;
    static final int SEND_CAPACITY = 1;
    static final int PACKETMAXSIZE = 524288;
    public static final long PROTOCOL_VERSION_V1 = -65536;
    public static final long PROTOCOL_VERSION_V2 = -65535;
    public static final int maxBuffer = 2048;
    private int cnxTO;
    final QuorumPeer self;
    final long mySid;
    final int socketTimeout;
    final Map<Long, QuorumPeer.QuorumServer> view;
    final boolean listenOnAllIPs;
    private ThreadPoolExecutor connectionExecutor;
    private QuorumAuthServer authServer;
    private QuorumAuthLearner authLearner;
    private boolean quorumSaslAuthEnabled;
    public final Listener listener;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) QuorumCnxManager.class);
    static final Supplier<Socket> DEFAULT_SOCKET_FACTORY = () -> {
        return new Socket();
    };
    private static Supplier<Socket> SOCKET_FACTORY = DEFAULT_SOCKET_FACTORY;
    private AtomicLong observerCounter = new AtomicLong(-1);
    private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet());
    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
    volatile boolean shutdown = false;
    private AtomicInteger threadCnt = new AtomicInteger(0);
    private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");
    public final BlockingQueue<Message> recvQueue = new CircularBlockingQueue(100);
    final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap = new ConcurrentHashMap<>();
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap = new ConcurrentHashMap<>();
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent = new ConcurrentHashMap<>();

    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$InitialMessage.class */
    public static class InitialMessage {
        public Long sid;
        public List<InetSocketAddress> electionAddr;

        /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$InitialMessage$InitialMessageException.class */
        public static class InitialMessageException extends Exception {
            InitialMessageException(String str, Object... objArr) {
                super(String.format(str, objArr));
            }
        }

        InitialMessage(Long l, List<InetSocketAddress> list) {
            this.sid = l;
            this.electionAddr = list;
        }

        public static InitialMessage parse(Long l, DataInputStream dataInputStream) throws InitialMessageException, IOException {
            if (l.longValue() != QuorumCnxManager.PROTOCOL_VERSION_V1 && l.longValue() != QuorumCnxManager.PROTOCOL_VERSION_V2) {
                throw new InitialMessageException("Got unrecognized protocol version %s", l);
            }
            Long valueOf = Long.valueOf(dataInputStream.readLong());
            int readInt = dataInputStream.readInt();
            if (readInt <= 0 || readInt > 2048) {
                throw new InitialMessageException("Unreasonable buffer length: %s", Integer.valueOf(readInt));
            }
            byte[] bArr = new byte[readInt];
            int read = dataInputStream.read(bArr);
            if (read != readInt) {
                throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", Integer.valueOf(read), Integer.valueOf(readInt), valueOf);
            }
            String[] split = new String(bArr, StandardCharsets.UTF_8).split("\\|");
            ArrayList arrayList = new ArrayList(split.length);
            for (String str : split) {
                try {
                    String[] hostAndPort = ConfigUtils.getHostAndPort(str);
                    if (hostAndPort.length != 2) {
                        throw new InitialMessageException("Badly formed address: %s", str);
                    }
                    try {
                        int parseInt = Integer.parseInt(hostAndPort[1]);
                        if (!isWildcardAddress(hostAndPort[0])) {
                            arrayList.add(new InetSocketAddress(hostAndPort[0], parseInt));
                        }
                    } catch (ArrayIndexOutOfBoundsException e) {
                        throw new InitialMessageException("No port number in: %s", str);
                    } catch (NumberFormatException e2) {
                        throw new InitialMessageException("Bad port number: %s", hostAndPort[1]);
                    }
                } catch (QuorumPeerConfig.ConfigException e3) {
                    throw new InitialMessageException("Badly formed address: %s", str);
                }
            }
            return new InitialMessage(valueOf, arrayList);
        }

        static boolean isWildcardAddress(String str) {
            try {
                return InetAddress.getByName(str).isAnyLocalAddress();
            } catch (UnknownHostException e) {
                return false;
            }
        }

        public String toString() {
            return "InitialMessage{sid=" + this.sid + ", electionAddr=" + this.electionAddr + '}';
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$Listener.class */
    public class Listener extends ZooKeeperThread {
        private static final String ELECTION_PORT_BIND_RETRY = "zookeeper.electionPortBindRetry";
        private static final int DEFAULT_PORT_BIND_MAX_RETRY = 3;
        private final int portBindMaxRetry;
        private Runnable socketBindErrorHandler;
        private List<ListenerHandler> listenerHandlers;
        private final AtomicBoolean socketException;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$Listener$ListenerHandler.class */
        public class ListenerHandler implements Runnable, Closeable {
            private ServerSocket serverSocket;
            private InetSocketAddress address;
            private boolean portUnification;
            private boolean sslQuorum;
            private CountDownLatch latch;

            ListenerHandler(InetSocketAddress inetSocketAddress, boolean z, boolean z2, CountDownLatch countDownLatch) {
                this.address = inetSocketAddress;
                this.portUnification = z;
                this.sslQuorum = z2;
                this.latch = countDownLatch;
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.currentThread().setName("ListenerHandler-" + this.address);
                    acceptConnections();
                    try {
                        close();
                    } catch (IOException e) {
                        QuorumCnxManager.LOG.warn("Exception when shutting down listener: ", (Throwable) e);
                    }
                } catch (Exception e2) {
                    QuorumCnxManager.LOG.error("Unexpected error ", (Throwable) e2);
                } finally {
                    this.latch.countDown();
                }
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public synchronized void close() throws IOException {
                if (this.serverSocket == null || this.serverSocket.isClosed()) {
                    return;
                }
                QuorumCnxManager.LOG.debug("Trying to close listeners: {}", this.serverSocket);
                this.serverSocket.close();
            }

            private void acceptConnections() {
                int i = 0;
                Socket socket = null;
                while (!QuorumCnxManager.this.shutdown && (Listener.this.portBindMaxRetry == 0 || i < Listener.this.portBindMaxRetry)) {
                    try {
                        this.serverSocket = createNewServerSocket();
                        QuorumCnxManager.LOG.info("{} is accepting connections now, my election bind port: {}", Long.valueOf(QuorumCnxManager.this.mySid), this.address.toString());
                        while (!QuorumCnxManager.this.shutdown) {
                            try {
                                socket = this.serverSocket.accept();
                                QuorumCnxManager.this.setSockOpts(socket);
                                QuorumCnxManager.LOG.info("Received connection request from {}", socket.getRemoteSocketAddress());
                                if (QuorumCnxManager.this.quorumSaslAuthEnabled) {
                                    QuorumCnxManager.this.receiveConnectionAsync(socket);
                                } else {
                                    QuorumCnxManager.this.receiveConnection(socket);
                                }
                                i = 0;
                            } catch (SocketTimeoutException e) {
                                QuorumCnxManager.LOG.warn("The socket is listening for the election accepted and it timed out unexpectedly, but will retry.see ZOOKEEPER-2836");
                            }
                        }
                    } catch (IOException e2) {
                        if (QuorumCnxManager.this.shutdown) {
                            break;
                        }
                        QuorumCnxManager.LOG.error("Exception while listening to address {}", this.address, e2);
                        if (e2 instanceof SocketException) {
                            Listener.this.socketException.set(true);
                        }
                        i++;
                        try {
                            close();
                            Thread.sleep(1000L);
                        } catch (IOException e3) {
                            QuorumCnxManager.LOG.error("Error closing server socket", (Throwable) e3);
                        } catch (InterruptedException e4) {
                            QuorumCnxManager.LOG.error("Interrupted while sleeping. Ignoring exception", (Throwable) e4);
                        }
                        QuorumCnxManager.this.closeSocket(socket);
                    }
                }
                if (QuorumCnxManager.this.shutdown) {
                    return;
                }
                QuorumCnxManager.LOG.error("Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.", NetUtils.formatInetAddr(this.address), Integer.valueOf(i), Listener.ELECTION_PORT_BIND_RETRY);
            }

            private ServerSocket createNewServerSocket() throws IOException {
                ServerSocket serverSocket;
                if (this.portUnification) {
                    QuorumCnxManager.LOG.info("Creating TLS-enabled quorum server socket");
                    serverSocket = new UnifiedServerSocket(QuorumCnxManager.this.self.getX509Util(), true);
                } else if (this.sslQuorum) {
                    QuorumCnxManager.LOG.info("Creating TLS-only quorum server socket");
                    serverSocket = new UnifiedServerSocket(QuorumCnxManager.this.self.getX509Util(), false);
                } else {
                    serverSocket = new ServerSocket();
                }
                serverSocket.setReuseAddress(true);
                this.address = new InetSocketAddress(this.address.getHostString(), this.address.getPort());
                serverSocket.bind(this.address);
                return serverSocket;
            }
        }

        public Listener() {
            super("ListenerThread");
            this.socketBindErrorHandler = () -> {
                ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
            };
            this.socketException = new AtomicBoolean(false);
            Integer integer = Integer.getInteger(ELECTION_PORT_BIND_RETRY, 3);
            if (integer.intValue() >= 0) {
                QuorumCnxManager.LOG.info("Election port bind maximum retries is {}", integer.intValue() == 0 ? "infinite" : integer);
                this.portBindMaxRetry = integer.intValue();
            } else {
                QuorumCnxManager.LOG.info("'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.", ELECTION_PORT_BIND_RETRY, integer, 3);
                this.portBindMaxRetry = 3;
            }
        }

        void setSocketBindErrorHandler(Runnable runnable) {
            this.socketBindErrorHandler = runnable;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            if (!QuorumCnxManager.this.shutdown) {
                QuorumCnxManager.LOG.debug("Listener thread started, myId: {}", Long.valueOf(QuorumCnxManager.this.self.getMyId()));
                Set<InetSocketAddress> wildcardAddresses = QuorumCnxManager.this.self.getQuorumListenOnAllIPs() ? QuorumCnxManager.this.self.getElectionAddress().getWildcardAddresses() : QuorumCnxManager.this.self.getElectionAddress().getAllAddresses();
                CountDownLatch countDownLatch = new CountDownLatch(wildcardAddresses.size());
                this.listenerHandlers = (List) wildcardAddresses.stream().map(inetSocketAddress -> {
                    return new ListenerHandler(inetSocketAddress, QuorumCnxManager.this.self.shouldUsePortUnification(), QuorumCnxManager.this.self.isSslQuorum(), countDownLatch);
                }).collect(Collectors.toList());
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(wildcardAddresses.size());
                try {
                    List<ListenerHandler> list = this.listenerHandlers;
                    Objects.requireNonNull(newFixedThreadPool);
                    list.forEach((v1) -> {
                        r1.submit(v1);
                    });
                    newFixedThreadPool.shutdown();
                    try {
                        try {
                            countDownLatch.await();
                            Iterator<ListenerHandler> it = this.listenerHandlers.iterator();
                            while (it.hasNext()) {
                                try {
                                    it.next().close();
                                } catch (IOException e) {
                                    QuorumCnxManager.LOG.debug("Error closing server socket", (Throwable) e);
                                }
                            }
                        } catch (InterruptedException e2) {
                            QuorumCnxManager.LOG.error("Interrupted while sleeping. Ignoring exception", (Throwable) e2);
                            Iterator<ListenerHandler> it2 = this.listenerHandlers.iterator();
                            while (it2.hasNext()) {
                                try {
                                    it2.next().close();
                                } catch (IOException e3) {
                                    QuorumCnxManager.LOG.debug("Error closing server socket", (Throwable) e3);
                                }
                            }
                        }
                    } catch (Throwable th) {
                        Iterator<ListenerHandler> it3 = this.listenerHandlers.iterator();
                        while (it3.hasNext()) {
                            try {
                                it3.next().close();
                            } catch (IOException e4) {
                                QuorumCnxManager.LOG.debug("Error closing server socket", (Throwable) e4);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    newFixedThreadPool.shutdown();
                    throw th2;
                }
            }
            QuorumCnxManager.LOG.info("Leaving listener");
            if (QuorumCnxManager.this.shutdown) {
                return;
            }
            QuorumCnxManager.LOG.error("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}", QuorumCnxManager.this.self.getElectionAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|")));
            if (this.socketException.get()) {
                this.socketBindErrorHandler.run();
            }
        }

        void halt() {
            QuorumCnxManager.LOG.debug("Halt called: Trying to close listeners");
            if (this.listenerHandlers != null) {
                QuorumCnxManager.LOG.debug("Closing listener: {}", Long.valueOf(QuorumCnxManager.this.mySid));
                Iterator<ListenerHandler> it = this.listenerHandlers.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (IOException e) {
                        QuorumCnxManager.LOG.warn("Exception when shutting down listener: ", (Throwable) e);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$Message.class */
    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer byteBuffer, long j) {
            this.buffer = byteBuffer;
            this.sid = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$QuorumConnectionReceiverThread.class */
    public class QuorumConnectionReceiverThread extends ZooKeeperThread {
        private final Socket sock;

        QuorumConnectionReceiverThread(Socket socket) {
            super("QuorumConnectionReceiverThread-" + socket.getRemoteSocketAddress());
            this.sock = socket;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            QuorumCnxManager.this.receiveConnection(this.sock);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$QuorumConnectionReqThread.class */
    public class QuorumConnectionReqThread extends ZooKeeperThread {
        final MultipleAddresses electionAddr;
        final Long sid;

        QuorumConnectionReqThread(MultipleAddresses multipleAddresses, Long l) {
            super("QuorumConnectionReqThread-" + l);
            this.electionAddr = multipleAddresses;
            this.sid = l;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                QuorumCnxManager.this.initiateConnection(this.electionAddr, this.sid);
            } finally {
                QuorumCnxManager.this.inprogressConnections.remove(this.sid);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$RecvWorker.class */
    public class RecvWorker extends ZooKeeperThread {
        Long sid;
        Socket sock;
        volatile boolean running;
        final DataInputStream din;
        final SendWorker sw;

        RecvWorker(Socket socket, DataInputStream dataInputStream, Long l, SendWorker sendWorker) {
            super("RecvWorker:" + l);
            this.running = true;
            this.sid = l;
            this.sock = socket;
            this.sw = sendWorker;
            this.din = dataInputStream;
            try {
                socket.setSoTimeout(0);
            } catch (IOException e) {
                QuorumCnxManager.LOG.error("Error while accessing socket for {}", l, e);
                QuorumCnxManager.this.closeSocket(socket);
                this.running = false;
            }
        }

        synchronized boolean finish() {
            QuorumCnxManager.LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid));
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            interrupt();
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                try {
                    QuorumCnxManager.LOG.debug("RecvWorker thread towards {} started. myId: {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid));
                    while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                        int readInt = this.din.readInt();
                        if (readInt <= 0 || readInt > 524288) {
                            throw new IOException("Received packet with invalid packet: " + readInt);
                        }
                        byte[] bArr = new byte[readInt];
                        this.din.readFully(bArr, 0, readInt);
                        QuorumCnxManager.this.addToRecvQueue(new Message(ByteBuffer.wrap(bArr), this.sid.longValue()));
                    }
                    QuorumCnxManager.LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid));
                    this.sw.finish();
                    QuorumCnxManager.this.closeSocket(this.sock);
                } catch (Exception e) {
                    QuorumCnxManager.LOG.warn("Connection broken for id {}, my id = {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid), e);
                    QuorumCnxManager.LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid));
                    this.sw.finish();
                    QuorumCnxManager.this.closeSocket(this.sock);
                }
            } catch (Throwable th) {
                QuorumCnxManager.LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid));
                this.sw.finish();
                QuorumCnxManager.this.closeSocket(this.sock);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/zookeeper-3.9.3.jar:org/apache/zookeeper/server/quorum/QuorumCnxManager$SendWorker.class */
    public class SendWorker extends ZooKeeperThread {
        Long sid;
        Socket sock;
        RecvWorker recvWorker;
        volatile boolean running;
        DataOutputStream dout;
        AtomicBoolean ongoingAsyncValidation;

        SendWorker(Socket socket, Long l) {
            super("SendWorker:" + l);
            this.running = true;
            this.ongoingAsyncValidation = new AtomicBoolean(false);
            this.sid = l;
            this.sock = socket;
            this.recvWorker = null;
            try {
                this.dout = new DataOutputStream(socket.getOutputStream());
            } catch (IOException e) {
                QuorumCnxManager.LOG.error("Unable to access socket output stream", (Throwable) e);
                QuorumCnxManager.this.closeSocket(socket);
                this.running = false;
            }
            QuorumCnxManager.LOG.debug("Address of remote peer: {}", this.sid);
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            QuorumCnxManager.LOG.debug("Calling SendWorker.finish for {}", this.sid);
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            QuorumCnxManager.this.closeSocket(this.sock);
            interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            QuorumCnxManager.LOG.debug("Removing entry from senderWorkerMap sid={}", this.sid);
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid, this);
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        synchronized void send(ByteBuffer byteBuffer) throws IOException {
            byte[] bArr = new byte[byteBuffer.capacity()];
            try {
                byteBuffer.position(0);
                byteBuffer.get(bArr);
                this.dout.writeInt(byteBuffer.capacity());
                this.dout.write(byteBuffer.array());
                this.dout.flush();
            } catch (BufferUnderflowException e) {
                QuorumCnxManager.LOG.error("BufferUnderflowException ", (Throwable) e);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            BlockingQueue<ByteBuffer> blockingQueue;
            ByteBuffer byteBuffer;
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                BlockingQueue<ByteBuffer> blockingQueue2 = QuorumCnxManager.this.queueSendMap.get(this.sid);
                if ((blockingQueue2 == null || QuorumCnxManager.this.isSendQueueEmpty(blockingQueue2)) && (byteBuffer = QuorumCnxManager.this.lastMessageSent.get(this.sid)) != null) {
                    QuorumCnxManager.LOG.debug("Attempting to send lastMessage to sid={}", this.sid);
                    send(byteBuffer);
                }
            } catch (IOException e) {
                QuorumCnxManager.LOG.error("Failed to send last message. Shutting down thread.", (Throwable) e);
                finish();
            }
            QuorumCnxManager.LOG.debug("SendWorker thread started towards {}. myId: {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid));
            while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                try {
                    try {
                        blockingQueue = QuorumCnxManager.this.queueSendMap.get(this.sid);
                    } catch (InterruptedException e2) {
                        QuorumCnxManager.LOG.warn("Interrupted while waiting for message on queue", (Throwable) e2);
                    }
                    if (blockingQueue == null) {
                        QuorumCnxManager.LOG.error("No queue of incoming messages for server {}", this.sid);
                        break;
                    }
                    ByteBuffer pollSendQueue = QuorumCnxManager.this.pollSendQueue(blockingQueue, 1000L, TimeUnit.MILLISECONDS);
                    if (pollSendQueue != null) {
                        QuorumCnxManager.this.lastMessageSent.put(this.sid, pollSendQueue);
                        send(pollSendQueue);
                    }
                } catch (Exception e3) {
                    QuorumCnxManager.LOG.warn("Exception when using channel: for id {} my id = {}", this.sid, Long.valueOf(QuorumCnxManager.this.mySid), e3);
                }
            }
            finish();
            QuorumCnxManager.LOG.warn("Send worker leaving thread id {} my id = {}", this.sid, Long.valueOf(QuorumCnxManager.this.self.getMyId()));
        }

        public void asyncValidateIfSocketIsStillReachable() {
            if (this.ongoingAsyncValidation.compareAndSet(false, true)) {
                new Thread(() -> {
                    QuorumCnxManager.LOG.debug("validate if destination address is reachable for sid {}", this.sid);
                    if (this.sock != null) {
                        InetAddress inetAddress = this.sock.getInetAddress();
                        try {
                            if (inetAddress.isReachable(500)) {
                                QuorumCnxManager.LOG.debug("destination address {} is reachable for sid {}", inetAddress.toString(), this.sid);
                                this.ongoingAsyncValidation.set(false);
                                return;
                            }
                        } catch (IOException | NullPointerException e) {
                        }
                        QuorumCnxManager.LOG.warn("destination address {} not reachable anymore, shutting down the SendWorker for sid {}", inetAddress.toString(), this.sid);
                        finish();
                    }
                }).start();
            } else {
                QuorumCnxManager.LOG.debug("validation of destination address for sid {} is skipped (it is already running)", this.sid);
            }
        }
    }

    static void setSocketFactory(Supplier<Socket> supplier) {
        SOCKET_FACTORY = supplier;
    }

    public QuorumCnxManager(QuorumPeer quorumPeer, long j, Map<Long, QuorumPeer.QuorumServer> map, QuorumAuthServer quorumAuthServer, QuorumAuthLearner quorumAuthLearner, int i, boolean z, int i2, boolean z2) {
        this.cnxTO = 5000;
        String property = System.getProperty("zookeeper.cnxTimeout");
        if (property != null) {
            this.cnxTO = Integer.parseInt(property);
        }
        this.self = quorumPeer;
        this.mySid = j;
        this.socketTimeout = i;
        this.view = map;
        this.listenOnAllIPs = z;
        this.authServer = quorumAuthServer;
        this.authLearner = quorumAuthLearner;
        this.quorumSaslAuthEnabled = z2;
        initializeConnectionExecutor(j, i2);
        this.listener = new Listener();
        this.listener.setName("QuorumPeerListener");
    }

    private void initializeConnectionExecutor(long j, int i) {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        SecurityManager securityManager = System.getSecurityManager();
        ThreadGroup threadGroup = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.connectionExecutor = new ThreadPoolExecutor(3, i, 60L, TimeUnit.SECONDS, new SynchronousQueue(), runnable -> {
            return new Thread(threadGroup, runnable, String.format("QuorumConnectionThread-[myid=%d]-%d", Long.valueOf(j), Integer.valueOf(atomicInteger.getAndIncrement())));
        });
        this.connectionExecutor.allowCoreThreadTimeOut(true);
    }

    public void testInitiateConnection(long j) {
        LOG.debug("Opening channel to server {}", Long.valueOf(j));
        initiateConnection(this.self.getVotingView().get(Long.valueOf(j)).electionAddr, Long.valueOf(j));
    }

    public void initiateConnection(MultipleAddresses multipleAddresses, Long l) {
        Socket socket = null;
        try {
            LOG.debug("Opening channel to server {}", l);
            socket = this.self.isSslQuorum() ? this.self.getX509Util().createSSLSocket() : SOCKET_FACTORY.get();
            setSockOpts(socket);
            socket.connect(multipleAddresses.getReachableOrOne(), this.cnxTO);
            if (socket instanceof SSLSocket) {
                SSLSocket sSLSocket = (SSLSocket) socket;
                sSLSocket.startHandshake();
                LOG.info("SSL handshake complete with {} - {} - {}", sSLSocket.getRemoteSocketAddress(), sSLSocket.getSession().getProtocol(), sSLSocket.getSession().getCipherSuite());
            }
            LOG.debug("Connected to server {} using election address: {}:{}", l, socket.getInetAddress(), Integer.valueOf(socket.getPort()));
            try {
                startConnection(socket, l);
            } catch (IOException e) {
                LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection", l, socket.getRemoteSocketAddress(), e);
                closeSocket(socket);
            }
        } catch (IOException | UnresolvedAddressException e2) {
            LOG.warn("Cannot open channel to {} at election address {}", l, multipleAddresses, e2);
            closeSocket(socket);
        } catch (X509Exception e3) {
            LOG.warn("Cannot open secure channel to {} at election address {}", l, multipleAddresses, e3);
            closeSocket(socket);
        }
    }

    public boolean initiateConnectionAsync(MultipleAddresses multipleAddresses, Long l) {
        if (!this.inprogressConnections.add(l)) {
            LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", l);
            return true;
        }
        try {
            this.connectionExecutor.execute(new QuorumConnectionReqThread(multipleAddresses, l));
            this.connectionThreadCnt.incrementAndGet();
            return true;
        } catch (Throwable th) {
            this.inprogressConnections.remove(l);
            LOG.error("Exception while submitting quorum connection request", th);
            return false;
        }
    }

    private boolean startConnection(Socket socket, Long l) throws IOException {
        LOG.debug("startConnection (myId:{} --> sid:{})", Long.valueOf(this.self.getMyId()), l);
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
            long j = this.self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
            dataOutputStream.writeLong(j);
            dataOutputStream.writeLong(this.self.getMyId());
            byte[] bytes = ((String) (j == PROTOCOL_VERSION_V2 ? this.self.getElectionAddress().getAllAddresses() : Arrays.asList(this.self.getElectionAddress().getOne())).stream().map(NetUtils::formatInetAddr).collect(Collectors.joining("|"))).getBytes();
            dataOutputStream.writeInt(bytes.length);
            dataOutputStream.write(bytes);
            dataOutputStream.flush();
            DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
            QuorumPeer.QuorumServer quorumServer = this.self.getVotingView().get(l);
            if (quorumServer != null) {
                this.authLearner.authenticate(socket, quorumServer.hostname);
            }
            if (l.longValue() > this.self.getMyId()) {
                LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", Long.valueOf(this.self.getMyId()), l);
                closeSocket(socket);
                return false;
            }
            LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", Long.valueOf(this.self.getMyId()), l);
            SendWorker sendWorker = new SendWorker(socket, l);
            RecvWorker recvWorker = new RecvWorker(socket, dataInputStream, l, sendWorker);
            sendWorker.setRecv(recvWorker);
            SendWorker sendWorker2 = this.senderWorkerMap.get(l);
            if (sendWorker2 != null) {
                sendWorker2.finish();
            }
            this.senderWorkerMap.put(l, sendWorker);
            this.queueSendMap.putIfAbsent(l, new CircularBlockingQueue(1));
            sendWorker.start();
            recvWorker.start();
            return true;
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", (Throwable) e);
            closeSocket(socket);
            return false;
        }
    }

    public void receiveConnection(Socket socket) {
        try {
            DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
            LOG.debug("Sync handling of connection request received from: {}", socket.getRemoteSocketAddress());
            handleConnection(socket, dataInputStream);
        } catch (IOException e) {
            LOG.error("Exception handling connection, addr: {}, closing server connection", socket.getRemoteSocketAddress());
            LOG.debug("Exception details: ", (Throwable) e);
            closeSocket(socket);
        }
    }

    public void receiveConnectionAsync(Socket socket) {
        try {
            LOG.debug("Async handling of connection request received from: {}", socket.getRemoteSocketAddress());
            this.connectionExecutor.execute(new QuorumConnectionReceiverThread(socket));
            this.connectionThreadCnt.incrementAndGet();
        } catch (Throwable th) {
            LOG.error("Exception handling connection, addr: {}, closing server connection", socket.getRemoteSocketAddress());
            LOG.debug("Exception details: ", th);
            closeSocket(socket);
        }
    }

    private void handleConnection(Socket socket, DataInputStream dataInputStream) throws IOException {
        Long l;
        MultipleAddresses multipleAddresses = null;
        try {
            Long valueOf = Long.valueOf(dataInputStream.readLong());
            if (valueOf.longValue() >= 0) {
                l = valueOf;
            } else {
                try {
                    InitialMessage parse = InitialMessage.parse(valueOf, dataInputStream);
                    l = parse.sid;
                    if (!parse.electionAddr.isEmpty()) {
                        multipleAddresses = new MultipleAddresses(parse.electionAddr, Duration.ofMillis(this.self.getMultiAddressReachabilityCheckTimeoutMs()));
                    }
                    LOG.debug("Initial message parsed by {}: {}", Long.valueOf(this.self.getMyId()), parse.toString());
                } catch (InitialMessage.InitialMessageException e) {
                    LOG.error("Initial message parsing error!", (Throwable) e);
                    closeSocket(socket);
                    return;
                }
            }
            if (l.longValue() == Long.MAX_VALUE) {
                l = Long.valueOf(this.observerCounter.getAndDecrement());
                LOG.info("Setting arbitrary identifier to observer: {}", l);
            }
            this.authServer.authenticate(socket, dataInputStream);
            if (l.longValue() < this.self.getMyId()) {
                SendWorker sendWorker = this.senderWorkerMap.get(l);
                if (sendWorker != null) {
                    sendWorker.finish();
                }
                LOG.debug("Create new connection to server: {}", l);
                closeSocket(socket);
                if (multipleAddresses != null) {
                    connectOne(l.longValue(), multipleAddresses);
                    return;
                } else {
                    connectOne(l.longValue());
                    return;
                }
            }
            if (l.longValue() == this.self.getMyId()) {
                LOG.warn("We got a connection request from a server with our own ID. This should be either a configuration error, or a bug.");
                return;
            }
            SendWorker sendWorker2 = new SendWorker(socket, l);
            RecvWorker recvWorker = new RecvWorker(socket, dataInputStream, l, sendWorker2);
            sendWorker2.setRecv(recvWorker);
            SendWorker sendWorker3 = this.senderWorkerMap.get(l);
            if (sendWorker3 != null) {
                sendWorker3.finish();
            }
            this.senderWorkerMap.put(l, sendWorker2);
            this.queueSendMap.putIfAbsent(l, new CircularBlockingQueue(1));
            sendWorker2.start();
            recvWorker.start();
        } catch (IOException e2) {
            LOG.warn("Exception reading or writing challenge", (Throwable) e2);
            closeSocket(socket);
        }
    }

    public void toSend(Long l, ByteBuffer byteBuffer) {
        if (this.mySid == l.longValue()) {
            byteBuffer.position(0);
            addToRecvQueue(new Message(byteBuffer.duplicate(), l.longValue()));
        } else {
            addToSendQueue(this.queueSendMap.computeIfAbsent(l, l2 -> {
                return new CircularBlockingQueue(1);
            }), byteBuffer);
            connectOne(l.longValue());
        }
    }

    synchronized boolean connectOne(long j, MultipleAddresses multipleAddresses) {
        if (this.senderWorkerMap.get(Long.valueOf(j)) == null) {
            return initiateConnectionAsync(multipleAddresses, Long.valueOf(j));
        }
        LOG.debug("There is a connection already for server {}", Long.valueOf(j));
        if (!this.self.isMultiAddressEnabled() || multipleAddresses.size() <= 1 || !this.self.isMultiAddressReachabilityCheckEnabled()) {
            return true;
        }
        this.senderWorkerMap.get(Long.valueOf(j)).asyncValidateIfSocketIsStillReachable();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void connectOne(long j) {
        if (this.senderWorkerMap.get(Long.valueOf(j)) != null) {
            LOG.debug("There is a connection already for server {}", Long.valueOf(j));
            if (this.self.isMultiAddressEnabled() && this.self.isMultiAddressReachabilityCheckEnabled()) {
                this.senderWorkerMap.get(Long.valueOf(j)).asyncValidateIfSocketIsStillReachable();
                return;
            }
            return;
        }
        synchronized (this.self.QV_LOCK) {
            boolean z = false;
            this.self.recreateSocketAddresses(j);
            Map<Long, QuorumPeer.QuorumServer> view = this.self.getView();
            QuorumVerifier lastSeenQuorumVerifier = this.self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> allMembers = lastSeenQuorumVerifier.getAllMembers();
            if (view.containsKey(Long.valueOf(j))) {
                z = true;
                LOG.debug("Server {} knows {} already, it is in the lastCommittedView", Long.valueOf(this.self.getMyId()), Long.valueOf(j));
                if (connectOne(j, view.get(Long.valueOf(j)).electionAddr)) {
                    return;
                }
            }
            if (lastSeenQuorumVerifier != null && allMembers.containsKey(Long.valueOf(j)) && (!z || !allMembers.get(Long.valueOf(j)).electionAddr.equals(view.get(Long.valueOf(j)).electionAddr))) {
                z = true;
                LOG.debug("Server {} knows {} already, it is in the lastProposedView", Long.valueOf(this.self.getMyId()), Long.valueOf(j));
                if (connectOne(j, allMembers.get(Long.valueOf(j)).electionAddr)) {
                    return;
                }
            }
            if (!z) {
                LOG.warn("Invalid server id: {} ", Long.valueOf(j));
            }
        }
    }

    public void connectAll() {
        Enumeration<Long> keys = this.queueSendMap.keys();
        while (keys.hasMoreElements()) {
            connectOne(keys.nextElement().longValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean haveDelivered() {
        Iterator<BlockingQueue<ByteBuffer>> it = this.queueSendMap.values().iterator();
        while (it.hasNext()) {
            int size = it.next().size();
            LOG.debug("Queue size: {}", Integer.valueOf(size));
            if (size == 0) {
                return true;
            }
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.debug("Halting listener");
        this.listener.halt();
        try {
            this.listener.join();
        } catch (InterruptedException e) {
            LOG.warn("Got interrupted before joining the listener", (Throwable) e);
        }
        softHalt();
        if (this.connectionExecutor != null) {
            this.connectionExecutor.shutdown();
        }
        this.inprogressConnections.clear();
        resetConnectionThreadCount();
    }

    public void softHalt() {
        for (SendWorker sendWorker : this.senderWorkerMap.values()) {
            LOG.debug("Server {} is soft-halting sender towards: {}", Long.valueOf(this.self.getMyId()), sendWorker);
            sendWorker.finish();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setSockOpts(Socket socket) throws SocketException {
        socket.setTcpNoDelay(true);
        socket.setKeepAlive(this.tcpKeepAlive);
        socket.setSoTimeout(this.socketTimeout);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeSocket(Socket socket) {
        if (socket == null) {
            return;
        }
        try {
            socket.close();
        } catch (IOException e) {
            LOG.error("Exception while closing", (Throwable) e);
        }
    }

    public long getThreadCount() {
        return this.threadCnt.get();
    }

    public long getConnectionThreadCount() {
        return this.connectionThreadCnt.get();
    }

    private void resetConnectionThreadCount() {
        this.connectionThreadCnt.set(0);
    }

    private void addToSendQueue(BlockingQueue<ByteBuffer> blockingQueue, ByteBuffer byteBuffer) {
        if (!blockingQueue.offer(byteBuffer)) {
            throw new RuntimeException("Could not insert into receive queue");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isSendQueueEmpty(BlockingQueue<ByteBuffer> blockingQueue) {
        return blockingQueue.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ByteBuffer pollSendQueue(BlockingQueue<ByteBuffer> blockingQueue, long j, TimeUnit timeUnit) throws InterruptedException {
        return blockingQueue.poll(j, timeUnit);
    }

    public void addToRecvQueue(Message message) {
        if (!this.recvQueue.offer(message)) {
            throw new RuntimeException("Could not insert into receive queue");
        }
    }

    public Message pollRecvQueue(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.recvQueue.poll(j, timeUnit);
    }

    public boolean connectedToPeer(long j) {
        return this.senderWorkerMap.get(Long.valueOf(j)) != null;
    }

    public boolean isReconfigEnabled() {
        return this.self.isReconfigEnabled();
    }
}
