package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.class */
public class DataXceiverServer implements Runnable {
    public static final Logger LOG;
    private static final int DEFAULT_RECONFIGURE_WAIT = 30;
    private final PeerServer peerServer;
    private final DataNode datanode;
    private final HashMap<Peer, Thread> peers = new HashMap<>();
    private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<>();
    private final Lock lock = new ReentrantLock();
    private final Condition noPeers = this.lock.newCondition();
    private boolean closed = false;
    private int maxReconfigureWaitTime = 30;
    volatile int maxXceiverCount;
    final BlockBalanceThrottler balanceThrottler;
    private volatile DataTransferThrottler transferThrottler;
    private volatile DataTransferThrottler writeThrottler;
    private volatile DataTransferThrottler readThrottler;
    final long estimateBlockSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DataXceiverServer$BlockBalanceThrottler.class */
    public static class BlockBalanceThrottler extends DataTransferThrottler {
        private final Semaphore semaphore;
        private int maxThreads;

        private BlockBalanceThrottler(long j, int i) {
            super(j);
            this.semaphore = new Semaphore(i, true);
            this.maxThreads = i;
            DataXceiverServer.LOG.info("Balancing bandwidth is " + j + " bytes/s");
            DataXceiverServer.LOG.info("Number threads for balancing is " + i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean setMaxConcurrentMovers(int i, int i2) {
            Preconditions.checkArgument(i > 0);
            int i3 = i - this.maxThreads;
            DataXceiverServer.LOG.debug("Change concurrent thread count to {} from {}", Integer.valueOf(i), Integer.valueOf(this.maxThreads));
            if (i3 == 0) {
                return true;
            }
            if (i3 > 0) {
                DataXceiverServer.LOG.debug("Adding thread capacity: {}", Integer.valueOf(i3));
                this.semaphore.release(i3);
                this.maxThreads = i;
                return true;
            }
            try {
                DataXceiverServer.LOG.debug("Removing thread capacity: {}. Max wait: {}", Integer.valueOf(i3), Integer.valueOf(i2));
                boolean tryAcquire = this.semaphore.tryAcquire(Math.abs(i3), i2, TimeUnit.SECONDS);
                if (tryAcquire) {
                    this.maxThreads = i;
                } else {
                    DataXceiverServer.LOG.warn("Could not lower thread count to {} from {}. Too busy.", Integer.valueOf(i), Integer.valueOf(this.maxThreads));
                }
                return tryAcquire;
            } catch (InterruptedException e) {
                DataXceiverServer.LOG.warn("Interrupted before adjusting thread count: {}", Integer.valueOf(i3));
                return false;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        public int getMaxConcurrentMovers() {
            return this.maxThreads;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean acquire() {
            return this.semaphore.tryAcquire();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void release() {
            this.semaphore.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataXceiverServer(PeerServer peerServer, Configuration configuration, DataNode dataNode) {
        this.peerServer = peerServer;
        this.datanode = dataNode;
        this.maxXceiverCount = configuration.getInt("dfs.datanode.max.transfer.threads", 4096);
        Preconditions.checkArgument(this.maxXceiverCount >= 1, "dfs.datanode.max.transfer.threads should not be less than 1.");
        this.estimateBlockSize = configuration.getLongBytes("dfs.blocksize", 134217728L);
        this.balanceThrottler = new BlockBalanceThrottler(configuration.getLongBytes("dfs.datanode.balance.bandwidthPerSec", 104857600L), configuration.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 100));
        initBandwidthPerSec(configuration);
    }

    private void initBandwidthPerSec(Configuration configuration) {
        long longBytes = configuration.getLongBytes(DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, 0L);
        if (longBytes > 0) {
            this.transferThrottler = new DataTransferThrottler(longBytes);
        } else {
            this.transferThrottler = null;
        }
        long longBytes2 = configuration.getLongBytes(DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, 0L);
        if (longBytes2 > 0) {
            this.writeThrottler = new DataTransferThrottler(longBytes2);
        } else {
            this.writeThrottler = null;
        }
        long longBytes3 = configuration.getLongBytes(DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, 0L);
        if (longBytes3 > 0) {
            this.readThrottler = new DataTransferThrottler(longBytes3);
        } else {
            this.readThrottler = null;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        int xceiverCount;
        Peer peer = null;
        while (this.datanode.shouldRun && !this.datanode.shutdownForUpgrade) {
            try {
                peer = this.peerServer.accept();
                xceiverCount = this.datanode.getXceiverCount();
            } catch (SocketTimeoutException e) {
            } catch (AsynchronousCloseException e2) {
                if (this.datanode.shouldRun && !this.datanode.shutdownForUpgrade) {
                    LOG.warn("{}:DataXceiverServer", this.datanode.getDisplayName(), e2);
                }
            } catch (IOException e3) {
                IOUtils.closeStream(peer);
                LOG.warn("{}:DataXceiverServer", this.datanode.getDisplayName(), e3);
            } catch (OutOfMemoryError e4) {
                IOUtils.closeStream(peer);
                LOG.error("DataNode is out of memory. Will retry in 30 seconds.", e4);
                try {
                    Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
                } catch (InterruptedException e5) {
                }
            } catch (Throwable th) {
                LOG.error("{}:DataXceiverServer: Exiting.", this.datanode.getDisplayName(), th);
                this.datanode.shouldRun = false;
            }
            if (xceiverCount > this.maxXceiverCount) {
                throw new IOException("Xceiver count " + xceiverCount + " exceeds the limit of concurrent xceivers: " + this.maxXceiverCount);
                break;
            }
            new Daemon(this.datanode.threadGroup, DataXceiver.create(peer, this.datanode, this)).start();
        }
        this.lock.lock();
        try {
            try {
                if (!this.closed) {
                    this.peerServer.close();
                    this.closed = true;
                }
            } catch (IOException e6) {
                LOG.warn("{}:DataXceiverServer: close exception", this.datanode.getDisplayName(), e6);
                this.lock.unlock();
            }
            if (this.datanode.shutdownForUpgrade) {
                restartNotifyPeers();
                LOG.info("Shutting down DataXceiverServer before restart");
                waitAllPeers(2L, TimeUnit.SECONDS);
            }
            closeAllPeers();
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void kill() {
        if (!$assertionsDisabled && this.datanode.shouldRun && !this.datanode.shutdownForUpgrade) {
            throw new AssertionError("shoudRun should be set to false or restarting should be true before killing");
        }
        this.lock.lock();
        try {
            if (!this.closed) {
                this.peerServer.close();
                this.closed = true;
            }
        } catch (IOException e) {
            LOG.warn("{}:DataXceiverServer.kill()", this.datanode.getDisplayName(), e);
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addPeer(Peer peer, Thread thread, DataXceiver dataXceiver) throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                throw new IOException("Server closed.");
            }
            this.peers.put(peer, thread);
            this.peersXceiver.put(peer, dataXceiver);
            this.datanode.metrics.incrDataNodeActiveXceiversCount();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closePeer(Peer peer) {
        this.lock.lock();
        try {
            this.peers.remove(peer);
            this.peersXceiver.remove(peer);
            this.datanode.metrics.decrDataNodeActiveXceiversCount();
            IOUtils.closeStream(peer);
            if (this.peers.isEmpty()) {
                this.noPeers.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void sendOOBToPeers() {
        this.lock.lock();
        try {
            if (this.datanode.shutdownForUpgrade) {
                Iterator<Peer> it = this.peers.keySet().iterator();
                while (it.hasNext()) {
                    try {
                        try {
                            this.peersXceiver.get(it.next()).sendOOB();
                        } catch (InterruptedException e) {
                            LOG.warn("Interrupted when sending OOB message.");
                        }
                    } catch (IOException e2) {
                        LOG.warn("Got error when sending OOB message.", e2);
                    }
                }
                this.lock.unlock();
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void stopWriters() {
        this.lock.lock();
        try {
            this.peers.keySet().forEach(peer -> {
                this.peersXceiver.get(peer).stopWriter();
            });
        } finally {
            this.lock.unlock();
        }
    }

    void restartNotifyPeers() {
        if (!$assertionsDisabled && (!this.datanode.shouldRun || !this.datanode.shutdownForUpgrade)) {
            throw new AssertionError();
        }
        this.lock.lock();
        try {
            this.peers.values().forEach(thread -> {
                thread.interrupt();
            });
        } finally {
            this.lock.unlock();
        }
    }

    void closeAllPeers() {
        LOG.info("Closing all peers.");
        this.lock.lock();
        try {
            this.peers.keySet().forEach((v0) -> {
                IOUtils.closeStream(v0);
            });
            this.peers.clear();
            this.peersXceiver.clear();
            this.datanode.metrics.setDataNodeActiveXceiversCount(0);
            this.datanode.metrics.setDataNodeReadActiveXceiversCount(0);
            this.datanode.metrics.setDataNodeWriteActiveXceiversCount(0);
            this.noPeers.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private boolean waitAllPeers(long j, TimeUnit timeUnit) {
        long nanos = timeUnit.toNanos(j);
        this.lock.lock();
        while (!this.peers.isEmpty()) {
            try {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    nanos = this.noPeers.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    LOG.debug("Interrupted waiting for peers to close");
                    this.lock.unlock();
                    return false;
                }
            } finally {
                this.lock.unlock();
            }
        }
        this.lock.unlock();
        return true;
    }

    int getNumPeers() {
        this.lock.lock();
        try {
            return this.peers.size();
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int getNumPeersXceiver() {
        this.lock.lock();
        try {
            return this.peersXceiver.size();
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PeerServer getPeerServer() {
        return this.peerServer;
    }

    public DataTransferThrottler getTransferThrottler() {
        return this.transferThrottler;
    }

    public DataTransferThrottler getWriteThrottler() {
        return this.writeThrottler;
    }

    public DataTransferThrottler getReadThrottler() {
        return this.readThrottler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releasePeer(Peer peer) {
        this.lock.lock();
        try {
            this.peers.remove(peer);
            this.peersXceiver.remove(peer);
            this.datanode.metrics.decrDataNodeActiveXceiversCount();
        } finally {
            this.lock.unlock();
        }
    }

    public boolean updateBalancerMaxConcurrentMovers(int i) {
        return this.balanceThrottler.setMaxConcurrentMovers(i, this.maxReconfigureWaitTime);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setMaxReconfigureWaitTime(int i) {
        this.maxReconfigureWaitTime = i;
    }

    public void setMaxXceiverCount(int i) {
        Preconditions.checkArgument(i > 0, "dfs.datanode.max.transfer.threads should be larger than 0");
        this.maxXceiverCount = i;
    }

    @VisibleForTesting
    public int getMaxXceiverCount() {
        return this.maxXceiverCount;
    }

    public void setTransferThrottler(DataTransferThrottler dataTransferThrottler) {
        this.transferThrottler = dataTransferThrottler;
    }

    public void setWriteThrottler(DataTransferThrottler dataTransferThrottler) {
        this.writeThrottler = dataTransferThrottler;
    }

    public void setReadThrottler(DataTransferThrottler dataTransferThrottler) {
        this.readThrottler = dataTransferThrottler;
    }

    static {
        $assertionsDisabled = !DataXceiverServer.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
    }
}
