package org.apache.hadoop.hdfs.server.federation.router;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.class */
public class ConnectionManager {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);
    protected static final float MIN_ACTIVE_RATIO = 0.5f;
    private final Configuration conf;
    private final int maxSize;
    private final long poolCleanupPeriodMs;
    private final long connectionCleanupPeriodMs;
    private final BlockingQueue<ConnectionPool> creatorQueue;
    private final int creatorQueueMaxSize;
    private final ConnectionCreator creator;
    private final int minSize = 1;
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.readWriteLock.readLock();
    private final Lock writeLock = this.readWriteLock.writeLock();
    private final ScheduledThreadPoolExecutor cleaner = new ScheduledThreadPoolExecutor(1);
    private boolean running = false;
    private final Map<ConnectionPoolId, ConnectionPool> pools = new HashMap();

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/ConnectionManager$CleanupTask.class */
    private class CleanupTask implements Runnable {
        private CleanupTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            long now = Time.now();
            LinkedList linkedList = new LinkedList();
            ConnectionManager.this.readLock.lock();
            try {
                for (Map.Entry entry : ConnectionManager.this.pools.entrySet()) {
                    ConnectionPool connectionPool = (ConnectionPool) entry.getValue();
                    long lastActiveTime = connectionPool.getLastActiveTime();
                    boolean z = now > lastActiveTime + ConnectionManager.this.poolCleanupPeriodMs;
                    if (lastActiveTime <= 0 || !z) {
                        ConnectionManager.LOG.debug("Cleaning up {}", connectionPool);
                        ConnectionManager.this.cleanup(connectionPool);
                    } else {
                        ConnectionManager.LOG.debug("Closing and removing stale pool {}", connectionPool);
                        connectionPool.close();
                        linkedList.add((ConnectionPoolId) entry.getKey());
                    }
                }
                if (linkedList.isEmpty()) {
                    return;
                }
                ConnectionManager.this.writeLock.lock();
                try {
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        ConnectionManager.this.pools.remove((ConnectionPoolId) it.next());
                    }
                } finally {
                    ConnectionManager.this.writeLock.unlock();
                }
            } finally {
                ConnectionManager.this.readLock.unlock();
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/ConnectionManager$ConnectionCreator.class */
    private static class ConnectionCreator extends Thread {
        private boolean running;
        private BlockingQueue<ConnectionPool> queue;

        ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
            super("Connection creator");
            this.running = true;
            this.queue = blockingQueue;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    ConnectionPool take = this.queue.take();
                    try {
                        int numConnections = take.getNumConnections();
                        int numActiveConnections = take.getNumActiveConnections();
                        if (take.getNumConnections() >= take.getMaxSize() || numActiveConnections < ConnectionManager.MIN_ACTIVE_RATIO * numConnections) {
                            ConnectionManager.LOG.debug("Cannot add more than {} connections to {}", Integer.valueOf(take.getMaxSize()), take);
                        } else {
                            take.addConnection(take.newConnection());
                        }
                    } catch (IOException e) {
                        ConnectionManager.LOG.error("Cannot create a new connection", e);
                    }
                } catch (InterruptedException e2) {
                    ConnectionManager.LOG.error("The connection creator was interrupted");
                    this.running = false;
                }
            }
        }

        public void shutdown() {
            this.running = false;
            interrupt();
        }
    }

    public ConnectionManager(Configuration configuration) {
        this.conf = configuration;
        this.maxSize = this.conf.getInt(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, 64);
        this.creatorQueueMaxSize = this.conf.getInt(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE, 100);
        this.creatorQueue = new ArrayBlockingQueue(this.creatorQueueMaxSize);
        this.creator = new ConnectionCreator(this.creatorQueue);
        this.creator.setDaemon(true);
        this.poolCleanupPeriodMs = this.conf.getLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
        LOG.info("Cleaning connection pools every {} seconds", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs)));
        this.connectionCleanupPeriodMs = this.conf.getLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
        LOG.info("Cleaning connections every {} seconds", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)));
    }

    public void start() {
        this.creator.start();
        long min = Math.min(this.poolCleanupPeriodMs, this.connectionCleanupPeriodMs);
        LOG.info("Cleaning every {} seconds", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(min)));
        this.cleaner.scheduleAtFixedRate(new CleanupTask(), 0L, min, TimeUnit.MILLISECONDS);
        this.running = true;
    }

    public void close() {
        this.creator.shutdown();
        this.cleaner.shutdown();
        this.running = false;
        this.writeLock.lock();
        try {
            Iterator<ConnectionPool> it = this.pools.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.pools.clear();
            this.writeLock.unlock();
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    public ConnectionContext getConnection(UserGroupInformation userGroupInformation, String str, Class<?> cls) throws IOException {
        if (!this.running) {
            LOG.error("Cannot get a connection to {} because the manager isn't running", str);
            return null;
        }
        ConnectionPoolId connectionPoolId = new ConnectionPoolId(userGroupInformation, str, cls);
        this.readLock.lock();
        try {
            ConnectionPool connectionPool = this.pools.get(connectionPoolId);
            this.readLock.unlock();
            if (connectionPool == null) {
                this.writeLock.lock();
                try {
                    connectionPool = this.pools.get(connectionPoolId);
                    if (connectionPool == null) {
                        Configuration configuration = this.conf;
                        getClass();
                        connectionPool = new ConnectionPool(configuration, str, userGroupInformation, 1, this.maxSize, cls);
                        this.pools.put(connectionPoolId, connectionPool);
                    }
                } finally {
                    this.writeLock.unlock();
                }
            }
            ConnectionContext connection = connectionPool.getConnection();
            if ((connection == null || !connection.isUsable()) && !this.creatorQueue.offer(connectionPool)) {
                LOG.error("Cannot add more than {} connections at the same time", Integer.valueOf(this.creatorQueueMaxSize));
            }
            if (connection != null && connection.isClosed()) {
                LOG.error("We got a closed connection from {}", connectionPool);
                connection = null;
            }
            return connection;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    public int getNumConnectionPools() {
        this.readLock.lock();
        try {
            int size = this.pools.size();
            this.readLock.unlock();
            return size;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    public int getNumConnections() {
        int i = 0;
        this.readLock.lock();
        try {
            Iterator<ConnectionPool> it = this.pools.values().iterator();
            while (it.hasNext()) {
                i += it.next().getNumConnections();
            }
            return i;
        } finally {
            this.readLock.unlock();
        }
    }

    public int getNumActiveConnections() {
        int i = 0;
        this.readLock.lock();
        try {
            Iterator<ConnectionPool> it = this.pools.values().iterator();
            while (it.hasNext()) {
                i += it.next().getNumActiveConnections();
            }
            return i;
        } finally {
            this.readLock.unlock();
        }
    }

    public int getNumCreatingConnections() {
        return this.creatorQueue.size();
    }

    public String getJSON() {
        TreeMap treeMap = new TreeMap();
        this.readLock.lock();
        try {
            for (Map.Entry<ConnectionPoolId, ConnectionPool> entry : this.pools.entrySet()) {
                treeMap.put(entry.getKey().toString(), entry.getValue().getJSON());
            }
            return JSON.toString(treeMap);
        } finally {
            this.readLock.unlock();
        }
    }

    @VisibleForTesting
    Map<ConnectionPoolId, ConnectionPool> getPools() {
        return this.pools;
    }

    @VisibleForTesting
    void cleanup(ConnectionPool connectionPool) {
        if (connectionPool.getNumConnections() > connectionPool.getMinSize()) {
            long now = Time.now() - connectionPool.getLastActiveTime();
            int numConnections = connectionPool.getNumConnections();
            int numActiveConnections = connectionPool.getNumActiveConnections();
            if (now > this.connectionCleanupPeriodMs || numActiveConnections < MIN_ACTIVE_RATIO * numConnections) {
                Iterator<ConnectionContext> it = connectionPool.removeConnections(1).iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                LOG.debug("Removed connection {} used {} seconds ago. Pool has {}/{} connections", new Object[]{connectionPool.getConnectionPoolId(), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(now)), Integer.valueOf(connectionPool.getNumConnections()), Integer.valueOf(connectionPool.getMaxSize())});
            }
        }
    }
}
