package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
@Category({MediumTests.class, ClientTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/client/TestClientScannerTimeouts.class */
public class TestClientScannerTimeouts {
    private AsyncConnection ASYNC_CONN;
    private Connection CONN;
    private static final int rpcTimeout = 1000;
    private static final int scanTimeout = 3000;
    private static final int metaReadRpcTimeout = 6000;
    private static final int metaScanTimeout = 9000;
    private static final int CLIENT_RETRIES_NUMBER = 3;
    private static TableName tableName;

    @Rule
    public TestName name = new TestName();

    @Parameterized.Parameter
    public boolean useScannerTimeoutPeriodForNextCalls;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestClientScannerTimeouts.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static final byte[] FAMILY = Bytes.toBytes("testFamily");
    private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
    private static final byte[] VALUE = Bytes.toBytes("testValue");
    private static final byte[] ROW0 = Bytes.toBytes("row-0");
    private static final byte[] ROW1 = Bytes.toBytes("row-1");
    private static final byte[] ROW2 = Bytes.toBytes("row-2");
    private static final byte[] ROW3 = Bytes.toBytes("row-3");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/client/TestClientScannerTimeouts$RSRpcServicesWithScanTimeout.class */
    public static class RSRpcServicesWithScanTimeout extends RSRpcServices {
        private long tableScannerId;
        private static boolean threw;
        private static volatile boolean slept;
        private static long seqNoToThrowOn = -1;
        private static boolean throwAlways = false;
        private static long seqNoToSleepOn = -1;
        private static boolean sleepOnOpen = false;
        private static boolean sleepOnRenew = false;
        private static boolean sleepOnClose = false;
        private static int tryNumber = 0;
        private static int sleepTime = AbstractGangliaSink.BUFFER_SIZE;

        public static void setSleepForTimeout(int i) {
            sleepTime = i + 500;
        }

        public static void reset() {
            setSleepForTimeout(3000);
            seqNoToSleepOn = -1L;
            seqNoToThrowOn = -1L;
            throwAlways = false;
            threw = false;
            sleepOnOpen = false;
            sleepOnRenew = false;
            sleepOnClose = false;
            slept = false;
            tryNumber = 0;
        }

        public RSRpcServicesWithScanTimeout(HRegionServer hRegionServer) throws IOException {
            super(hRegionServer);
        }

        @Override // org.apache.hadoop.hbase.regionserver.RSRpcServices, org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface
        public ClientProtos.ScanResponse scan(RpcController rpcController, ClientProtos.ScanRequest scanRequest) throws ServiceException {
            if (!scanRequest.hasScannerId()) {
                ClientProtos.ScanResponse scan = super.scan(rpcController, scanRequest);
                if (!Bytes.toString(scanRequest.getRegion().getValue().toByteArray()).contains(TableName.META_TABLE_NAME.getNameAsString())) {
                    this.tableScannerId = scan.getScannerId();
                    if (sleepOnOpen) {
                        try {
                            LOG.info("openScanner SLEEPING " + sleepTime);
                            Thread.sleep(sleepTime);
                        } catch (InterruptedException e) {
                        }
                    }
                }
                return scan;
            }
            LOG.info("Got request {}", scanRequest);
            ClientProtos.ScanResponse scan2 = super.scan(rpcController, scanRequest);
            if (this.tableScannerId != scanRequest.getScannerId()) {
                return scan2;
            }
            if (scanRequest.getCloseScanner()) {
                if (!slept && sleepOnClose) {
                    try {
                        LOG.info("SLEEPING " + sleepTime);
                        Thread.sleep(sleepTime);
                    } catch (InterruptedException e2) {
                    }
                    slept = true;
                    tryNumber++;
                }
                return scan2;
            }
            if (throwAlways || (!threw && scanRequest.hasNextCallSeq() && seqNoToThrowOn == scanRequest.getNextCallSeq())) {
                threw = true;
                tryNumber++;
                LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", Integer.valueOf(tryNumber), Long.valueOf(this.tableScannerId));
                throw new ServiceException(new OutOfOrderScannerNextException());
            }
            if (!slept && ((scanRequest.hasNextCallSeq() && seqNoToSleepOn == scanRequest.getNextCallSeq()) || (sleepOnRenew && scanRequest.getRenew()))) {
                try {
                    LOG.info("SLEEPING " + sleepTime);
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e3) {
                }
                slept = true;
                tryNumber++;
            }
            return scan2;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/client/TestClientScannerTimeouts$RegionServerWithScanTimeout.class */
    private static class RegionServerWithScanTimeout extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
        public RegionServerWithScanTimeout(Configuration configuration) throws IOException, InterruptedException {
            super(configuration);
        }

        @Override // org.apache.hadoop.hbase.regionserver.HRegionServer
        protected RSRpcServices createRpcServices() throws IOException {
            return new RSRpcServicesWithScanTimeout(this);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> parameters() {
        return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        Configuration configuration = TEST_UTIL.getConfiguration();
        configuration.setInt("hbase.regionserver.msginterval", 30000);
        configuration.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
        configuration.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
        configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
        configuration.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
        TEST_UTIL.startMiniCluster(1);
    }

    @Before
    public void setUp() throws Exception {
        Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
        configuration.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 3000);
        configuration.setInt(ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaReadRpcTimeout);
        configuration.setInt(ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
        configuration.setBoolean(ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS, this.useScannerTimeoutPeriodForNextCalls);
        this.ASYNC_CONN = ConnectionFactory.createAsyncConnection(configuration).get();
        this.CONN = ConnectionFactory.createConnection(configuration);
    }

    @After
    public void after() throws Exception {
        this.CONN.close();
        this.ASYNC_CONN.close();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL.shutdownMiniCluster();
    }

    public void setup(boolean z) throws IOException {
        RSRpcServicesWithScanTimeout.reset();
        String str = this.name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_") + "-" + this.useScannerTimeoutPeriodForNextCalls;
        if (z) {
            str = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + str;
        }
        tableName = TableName.valueOf(str);
        TEST_UTIL.createTable(tableName, FAMILY);
        Table table = this.CONN.getTable(tableName);
        putToTable(table, ROW0);
        putToTable(table, ROW1);
        putToTable(table, ROW2);
        putToTable(table, ROW3);
        LOG.info("Wrote our four values");
        table.getRegionLocator().getAllRegionLocations();
        RSRpcServicesWithScanTimeout.reset();
    }

    private void expectRow(byte[] bArr, Result result) {
        Assert.assertTrue("Expected row: " + Bytes.toString(bArr), Bytes.equals(bArr, result.getRow()));
    }

    private void expectNumTries(int i) {
        Assert.assertEquals("Expected tryNumber=" + i + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber, i, RSRpcServicesWithScanTimeout.tryNumber);
        int unused = RSRpcServicesWithScanTimeout.tryNumber = 0;
    }

    @Test
    public void testRetryOutOfOrderScannerNextException() throws IOException {
        expectRetryOutOfOrderScannerNext(() -> {
            return getScanner(this.CONN);
        });
    }

    @Test
    public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException {
        expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
    }

    @Test
    public void testNormalScanTimeoutOnNext() throws IOException {
        setup(false);
        testScanTimeoutOnNext(1000, 3000);
    }

    @Test
    public void testNormalScanTimeoutOnNextAsync() throws IOException {
        setup(false);
        expectTimeoutOnNext(3000, this::getAsyncScanner);
    }

    @Test
    public void testNormalScanTimeoutOnOpenScanner() throws IOException {
        setup(false);
        expectTimeoutOnOpenScanner(1000, this::getScanner);
    }

    @Test
    public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException {
        setup(false);
        expectTimeoutOnOpenScanner(1000, this::getAsyncScanner);
    }

    @Test
    public void testMetaScanTimeoutOnNext() throws IOException {
        setup(true);
        testScanTimeoutOnNext(metaReadRpcTimeout, metaScanTimeout);
    }

    private void testScanTimeoutOnNext(int i, int i2) throws IOException {
        if (this.useScannerTimeoutPeriodForNextCalls) {
            expectTimeoutOnNext(i2, this::getScanner);
            return;
        }
        Connection noRetriesConnection = getNoRetriesConnection();
        Throwable th = null;
        try {
            try {
                expectTimeoutOnNext(i, () -> {
                    return getScanner(noRetriesConnection);
                });
                if (noRetriesConnection != null) {
                    if (0 == 0) {
                        noRetriesConnection.close();
                        return;
                    }
                    try {
                        noRetriesConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (noRetriesConnection != null) {
                if (th != null) {
                    try {
                        noRetriesConnection.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    noRetriesConnection.close();
                }
            }
            throw th4;
        }
    }

    private Connection getNoRetriesConnection() throws IOException {
        Configuration configuration = new Configuration(this.CONN.getConfiguration());
        configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
        return ConnectionFactory.createConnection(configuration);
    }

    @Test
    public void testMetaScanTimeoutOnNextAsync() throws IOException {
        setup(true);
        expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
    }

    @Test
    public void testMetaScanTimeoutOnOpenScanner() throws IOException {
        setup(true);
        expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getScanner);
    }

    @Test
    public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
        setup(true);
        expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getAsyncScanner);
    }

    @Test
    public void testNormalScanTimeoutOnRenewLease() throws IOException {
        setup(false);
        expectTimeoutOnRenewScanner(1000, this::getScanner);
    }

    @Test
    public void testMetaScanTimeoutOnRenewLease() throws IOException {
        setup(true);
        expectTimeoutOnRenewScanner(metaReadRpcTimeout, this::getScanner);
    }

    @Test
    public void testNormalScanTimeoutOnClose() throws IOException {
        setup(false);
        expectTimeoutOnCloseScanner(1000, this::getScanner);
    }

    @Test
    public void testMetaScanTimeoutOnClose() throws IOException {
        setup(true);
        expectTimeoutOnCloseScanner(metaReadRpcTimeout, this::getScanner);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> supplier) throws IOException {
        setup(false);
        long unused = RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1L;
        LOG.info("Opening scanner, expecting no errors from first next() call from openScanner response");
        ResultScanner resultScanner = supplier.get();
        expectRow(ROW0, resultScanner.next());
        expectNumTries(0);
        LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
        expectRow(ROW1, resultScanner.next());
        expectNumTries(0);
        LOG.info("Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
        expectRow(ROW2, resultScanner.next());
        expectNumTries(1);
        long unused2 = RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1L;
        LOG.info("Finishing scan, expecting no errors");
        expectRow(ROW3, resultScanner.next());
        resultScanner.close();
        LOG.info("Testing always throw exception");
        byte[] bArr = {ROW0, ROW1, ROW2, ROW3};
        int i = 0;
        ResultScanner resultScanner2 = supplier.get();
        boolean unused3 = RSRpcServicesWithScanTimeout.throwAlways = true;
        while (true) {
            LOG.info("Calling scanner.next()");
            Result next = resultScanner2.next();
            if (next == null) {
                Assert.assertEquals("Expected to exhaust expectedResults array length=" + bArr.length + ", actual index=" + i, bArr.length, i);
                expectNumTries(bArr.length - 1);
                return;
            } else {
                int i2 = i;
                i++;
                expectRow(bArr[i2], next);
            }
        }
    }

    private void expectTimeoutOnNext(int i, Supplier<ResultScanner> supplier) throws IOException {
        long unused = RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1L;
        RSRpcServicesWithScanTimeout.setSleepForTimeout(i);
        LOG.info("Opening scanner, expecting no timeouts from first next() call from openScanner response");
        ResultScanner resultScanner = supplier.get();
        expectRow(ROW0, resultScanner.next());
        LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
        expectRow(ROW1, resultScanner.next());
        LOG.info("Making second next() RPC, expecting timeout");
        long nanoTime = System.nanoTime();
        try {
            resultScanner.next();
            Assert.fail("Expected CallTimeoutException");
        } catch (RetriesExhaustedException e) {
            Assert.assertTrue("Expected CallTimeoutException", (e.getCause() instanceof CallTimeoutException) || (e.getCause() instanceof SocketTimeoutException));
        }
        expectTimeout(nanoTime, i);
    }

    private void expectTimeoutOnOpenScanner(int i, Supplier<ResultScanner> supplier) throws IOException {
        RSRpcServicesWithScanTimeout.setSleepForTimeout(i);
        boolean unused = RSRpcServicesWithScanTimeout.sleepOnOpen = true;
        LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response");
        long nanoTime = System.nanoTime();
        try {
            supplier.get().next();
            Assert.fail("Expected SocketTimeoutException or CallTimeoutException");
        } catch (RetriesExhaustedException e) {
            LOG.info("Got error", e);
            Assert.assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(), (e.getCause() instanceof CallTimeoutException) || (e.getCause() instanceof SocketTimeoutException));
        }
        expectTimeout(nanoTime, i);
    }

    private void expectTimeoutOnRenewScanner(int i, Supplier<ResultScanner> supplier) throws IOException {
        RSRpcServicesWithScanTimeout.setSleepForTimeout(i);
        boolean unused = RSRpcServicesWithScanTimeout.sleepOnRenew = true;
        LOG.info("Opening scanner, expecting no timeouts from first next() call from openScanner response");
        long nanoTime = System.nanoTime();
        ResultScanner resultScanner = supplier.get();
        resultScanner.next();
        Assert.assertFalse("Expected renewLease to fail due to timeout", resultScanner.renewLease());
        expectTimeout(nanoTime, i);
    }

    private void expectTimeoutOnCloseScanner(int i, Supplier<ResultScanner> supplier) throws IOException {
        RSRpcServicesWithScanTimeout.setSleepForTimeout(i);
        boolean unused = RSRpcServicesWithScanTimeout.sleepOnClose = true;
        LOG.info("Opening scanner, expecting no timeouts from first next() call from openScanner response");
        long nanoTime = System.nanoTime();
        ResultScanner resultScanner = supplier.get();
        resultScanner.next();
        resultScanner.close();
        expectTimeout(nanoTime, i);
    }

    private void expectTimeout(long j, int i) {
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
        LOG.info("Expected duration >= {}, and got {}", Integer.valueOf(i), Long.valueOf(millis));
        Assert.assertTrue("Expected duration >= " + i + ", but was " + millis, millis >= ((long) i));
    }

    private ResultScanner getScanner() {
        return getScanner(this.CONN);
    }

    private ResultScanner getScanner(Connection connection) {
        Scan scan = new Scan();
        scan.setCaching(1);
        try {
            return connection.getTable(tableName).getScanner(scan);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private ResultScanner getAsyncScanner() {
        Scan scan = new Scan();
        scan.setCaching(1);
        return this.ASYNC_CONN.getTable(tableName).getScanner(scan);
    }

    private void putToTable(Table table, byte[] bArr) throws IOException {
        Put put = new Put(bArr);
        put.addColumn(FAMILY, QUALIFIER, VALUE);
        table.put(put);
    }
}
