package org.apache.hadoop.hdfs.server.federation.router;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.class */
public class TestRouterClientRejectOverload {
    private static final Logger LOG = LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
    private StateStoreDFSCluster cluster;

    @Rule
    public ExpectedException exceptionRule = ExpectedException.none();

    @After
    public void cleanup() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void setupCluster(boolean z, boolean z2) throws Exception {
        this.cluster = new StateStoreDFSCluster(z2, 2);
        Configuration build = new RouterConfigBuilder().stateStore().metrics().admin().rpc().heartbeat().build();
        build.setInt("dfs.federation.router.client.thread-size", 4);
        build.setBoolean("dfs.federation.router.client.reject.overload", z);
        this.cluster.setNumDatanodesPerNameservice(0);
        this.cluster.addRouterOverrides(build);
        this.cluster.startCluster();
        this.cluster.startRouters();
        this.cluster.waitClusterUp();
    }

    @Test
    public void testWithoutOverloadControl() throws Exception {
        setupCluster(false, false);
        testOverloaded(0);
        FederationTestUtils.simulateSlowNamenode(this.cluster.getCluster().getNameNode(0), 1);
        testOverloaded(0);
        Iterator<MiniRouterDFSCluster.RouterContext> it = this.cluster.getRouters().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(0L, it.next().getRouter().getRpcServer().getRPCMetrics().getProxyOpFailureClientOverloaded());
        }
    }

    @Test
    public void testOverloadControl() throws Exception {
        setupCluster(true, false);
        List<MiniRouterDFSCluster.RouterContext> routers = this.cluster.getRouters();
        FederationRPCMetrics rPCMetrics = routers.get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rPCMetrics2 = routers.get(1).getRouter().getRpcServer().getRPCMetrics();
        testOverloaded(0);
        Assert.assertEquals(0L, rPCMetrics.getProxyOpFailureClientOverloaded());
        Assert.assertEquals(0L, rPCMetrics2.getProxyOpFailureClientOverloaded());
        FederationTestUtils.simulateSlowNamenode(this.cluster.getCluster().getNameNode(0), 1);
        testOverloaded(4, 6);
        Assert.assertTrue(rPCMetrics.getProxyOpFailureClientOverloaded() + rPCMetrics2.getProxyOpFailureClientOverloaded() >= 4);
        Configuration routerClientConf = this.cluster.getRouterClientConf();
        long proxyOps = rPCMetrics.getProxyOps();
        long proxyOps2 = rPCMetrics2.getProxyOps();
        testOverloaded(0, 0, new URI("hdfs://fed/"), routerClientConf, 10);
        long proxyOps3 = rPCMetrics.getProxyOps() - proxyOps;
        long proxyOps4 = rPCMetrics2.getProxyOps() - proxyOps2;
        Assert.assertEquals(20L, proxyOps3 + proxyOps4);
        Assert.assertTrue(proxyOps3 + " operations: not distributed", proxyOps3 >= 8);
        Assert.assertTrue(proxyOps4 + " operations: not distributed", proxyOps4 >= 8);
    }

    private void testOverloaded(int i) throws Exception {
        testOverloaded(i, i);
    }

    private void testOverloaded(int i, int i2) throws Exception {
        testOverloaded(i, i2, this.cluster.getRandomRouter().getFileSystemURI(), new HdfsConfiguration(), 10);
    }

    private void testOverloaded(int i, int i2, URI uri, Configuration configuration, int i3) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i3);
        ArrayList arrayList = new ArrayList();
        for (int i4 = 0; i4 < i3; i4++) {
            int i5 = i4 * 50;
            arrayList.add(newFixedThreadPool.submit(() -> {
                DFSClient dFSClient = null;
                try {
                    try {
                        Thread.sleep(i5);
                        dFSClient = new DFSClient(uri, configuration);
                        dFSClient.getNamenode().renewLease(dFSClient.getClientName(), (List) null);
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e) {
                                LOG.error("Cannot close the client");
                            }
                        }
                    } catch (IOException e2) {
                        Assert.fail("Unexpected exception: " + e2);
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e3) {
                                LOG.error("Cannot close the client");
                            }
                        }
                    } catch (InterruptedException e4) {
                        Assert.fail("Cannot sleep: " + e4);
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e5) {
                                LOG.error("Cannot close the client");
                            }
                        }
                    } catch (RemoteException e6) {
                        IOException unwrapRemoteException = e6.unwrapRemoteException();
                        Assert.assertTrue("Wrong exception: " + unwrapRemoteException, unwrapRemoteException instanceof StandbyException);
                        GenericTestUtils.assertExceptionContains("is overloaded", unwrapRemoteException);
                        atomicInteger.incrementAndGet();
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e7) {
                                LOG.error("Cannot close the client");
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (dFSClient != null) {
                        try {
                            dFSClient.close();
                        } catch (IOException e8) {
                            LOG.error("Cannot close the client");
                        }
                    }
                    throw th;
                }
            }));
        }
        while (!arrayList.isEmpty()) {
            ((Future) arrayList.remove(0)).get();
        }
        newFixedThreadPool.shutdown();
        int i6 = atomicInteger.get();
        if (i == i2) {
            Assert.assertEquals(i, i6);
        } else {
            Assert.assertTrue("Expected >=" + i + " but was " + i6, i6 >= i);
            Assert.assertTrue("Expected <=" + i2 + " but was " + i6, i6 <= i2);
        }
    }

    @Test
    public void testConnectionNullException() throws Exception {
        setupCluster(false, false);
        FederationTestUtils.simulateThrowExceptionRouterRpcServer(this.cluster.getRouters().get(0).getRouter().getRpcServer());
        Configuration routerClientConf = this.cluster.getRouterClientConf();
        routerClientConf.setBoolean("dfs.client.failover.random.order", false);
        DFSClient dFSClient = new DFSClient(new URI("hdfs://fed"), routerClientConf);
        FederationRPCMetrics rPCMetrics = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rPCMetrics2 = this.cluster.getRouters().get(1).getRouter().getRpcServer().getRPCMetrics();
        long proxyOpFailureCommunicate = rPCMetrics.getProxyOpFailureCommunicate();
        long proxyOpFailureCommunicate2 = rPCMetrics2.getProxyOpFailureCommunicate();
        dFSClient.getFileInfo("/");
        Assert.assertEquals(proxyOpFailureCommunicate + 1, rPCMetrics.getProxyOpFailureCommunicate());
        Assert.assertEquals(proxyOpFailureCommunicate2, rPCMetrics2.getProxyOpFailureCommunicate());
    }

    @Test
    public void testNoNamenodesAvailable() throws Exception {
        setupCluster(false, true);
        FederationTestUtils.transitionClusterNSToStandby(this.cluster);
        Configuration routerClientConf = this.cluster.getRouterClientConf();
        routerClientConf.setBoolean("dfs.client.failover.random.order", false);
        routerClientConf.setInt("dfs.client.retry.max.attempts", 2);
        DFSClient dFSClient = new DFSClient(new URI("hdfs://fed"), routerClientConf);
        FederationRPCMetrics rPCMetrics = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        FederationRPCMetrics rPCMetrics2 = this.cluster.getRouters().get(1).getRouter().getRpcServer().getRPCMetrics();
        long proxyOpNoNamenodes = rPCMetrics.getProxyOpNoNamenodes();
        long proxyOpNoNamenodes2 = rPCMetrics2.getProxyOpNoNamenodes();
        this.exceptionRule.expect(RemoteException.class);
        this.exceptionRule.expectMessage("org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException: No namenodes available under nameservice ns0");
        dFSClient.getFileInfo("/");
        Assert.assertEquals(proxyOpNoNamenodes + 4, rPCMetrics.getProxyOpNoNamenodes());
        Assert.assertEquals(proxyOpNoNamenodes2, rPCMetrics2.getProxyOpNoNamenodes());
        FederationTestUtils.transitionClusterNSToActive(this.cluster, 0);
        for (MiniRouterDFSCluster.RouterContext routerContext : this.cluster.getRouters()) {
            Iterator it = routerContext.getRouter().getNamenodeHeartbeatServices().iterator();
            while (it.hasNext()) {
                ((NamenodeHeartbeatService) it.next()).periodicInvoke();
            }
            routerContext.getRouter().getStateStore().refreshCaches(true);
        }
        long proxyOpNoNamenodes3 = rPCMetrics.getProxyOpNoNamenodes();
        dFSClient.getFileInfo("/");
        Assert.assertEquals(proxyOpNoNamenodes3, rPCMetrics.getProxyOpNoNamenodes());
    }

    @Test
    public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception {
        setupCluster(false, true);
        FederationTestUtils.transitionClusterNSToStandby(this.cluster);
        for (MiniRouterDFSCluster.RouterContext routerContext : this.cluster.getRouters()) {
            Iterator it = routerContext.getRouter().getNamenodeHeartbeatServices().iterator();
            while (it.hasNext()) {
                ((NamenodeHeartbeatService) it.next()).periodicInvoke();
            }
            routerContext.getRouter().getStateStore().refreshCaches(true);
        }
        long now = Time.now();
        Iterator<MiniRouterDFSCluster.NamenodeContext> it2 = this.cluster.getNamenodes().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(HAServiceProtocol.HAServiceState.STANDBY.ordinal(), it2.next().getNamenode().getNameNodeState());
        }
        Configuration routerClientConf = this.cluster.getRouterClientConf();
        routerClientConf.setBoolean("dfs.client.failover.random.order", false);
        DFSClient dFSClient = new DFSClient(new URI("hdfs://fed"), routerClientConf);
        for (MiniRouterDFSCluster.RouterContext routerContext2 : this.cluster.getRouters()) {
            this.cluster.switchToActive("ns0", ((FederationNamenodeContext) routerContext2.getRouter().getNamenodeResolver().getNamenodesForNameserviceId("ns0", false).get(1)).getNamenodeId());
            Iterator it3 = routerContext2.getRouter().getNamenodeHeartbeatServices().iterator();
            while (it3.hasNext()) {
                ((NamenodeHeartbeatService) it3.next()).periodicInvoke();
            }
            Assert.assertEquals(HAServiceProtocol.HAServiceState.ACTIVE.ordinal(), this.cluster.getNamenode("ns0", r0).getNamenode().getNameNodeState());
        }
        FederationRPCMetrics rPCMetrics = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        long proxyOpNoNamenodes = rPCMetrics.getProxyOpNoNamenodes();
        dFSClient.getFileInfo("/");
        long now2 = Time.now();
        Assert.assertEquals(proxyOpNoNamenodes + 1, rPCMetrics.getProxyOpNoNamenodes());
        Assert.assertTrue(now2 - now < this.cluster.getCacheFlushInterval());
    }

    @Test
    public void testAsyncCallerPoolMetrics() throws Exception {
        setupCluster(true, false);
        FederationTestUtils.simulateSlowNamenode(this.cluster.getCluster().getNameNode(0), 2);
        ObjectMapper objectMapper = new ObjectMapper();
        this.cluster.getRouters().remove(1);
        FederationRPCMetrics rPCMetrics = this.cluster.getRouters().get(0).getRouter().getRpcServer().getRPCMetrics();
        Map map = (Map) objectMapper.readValue(rPCMetrics.getAsyncCallerPool(), Map.class);
        Assert.assertEquals(0L, ((Integer) map.get("active")).intValue());
        Assert.assertEquals(0L, ((Integer) map.get("total")).intValue());
        Assert.assertEquals(4L, ((Integer) map.get("max")).intValue());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            newSingleThreadExecutor.submit(() -> {
                DFSClient dFSClient = null;
                try {
                    try {
                        dFSClient = new DFSClient(new URI("hdfs://fed"), this.cluster.getRouterClientConf());
                        dFSClient.getNamenode().renewLease(dFSClient.getClientName(), (List) null);
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e) {
                                LOG.error("Cannot close the client");
                            }
                        }
                    } catch (Throwable th) {
                        if (dFSClient != null) {
                            try {
                                dFSClient.close();
                            } catch (IOException e2) {
                                LOG.error("Cannot close the client");
                            }
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    Assert.fail("Client request failed: " + e3);
                    if (dFSClient != null) {
                        try {
                            dFSClient.close();
                        } catch (IOException e4) {
                            LOG.error("Cannot close the client");
                        }
                    }
                }
            });
            GenericTestUtils.waitFor(() -> {
                try {
                    Map map2 = (Map) objectMapper.readValue(rPCMetrics.getAsyncCallerPool(), Map.class);
                    if (((Integer) map2.get("active")).intValue() == 1 && ((Integer) map2.get("max")).intValue() == 4) {
                        int intValue = ((Integer) map2.get("total")).intValue();
                        return Boolean.valueOf(intValue >= 1 && intValue <= 4);
                    }
                    return false;
                } catch (Exception e) {
                    LOG.error("Not able to parse metrics result: " + e);
                    return false;
                }
            }, 100L, 2000L);
            newSingleThreadExecutor.shutdown();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }
}
