package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.class */
public class TestBatchIbr {
    private static final short NUM_DATANODES = 4;
    private static final int BLOCK_SIZE = 1024;
    private static final int MAX_BLOCK_NUM = 8;
    private static final int NUM_FILES = 1000;
    private static final int NUM_THREADS = 128;
    public static final Log LOG = LogFactory.getLog(TestBatchIbr.class);
    private static final ThreadLocalBuffer IO_BUF = new ThreadLocalBuffer();
    private static final ThreadLocalBuffer VERIFY_BUF = new ThreadLocalBuffer();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/TestBatchIbr$ThreadLocalBuffer.class */
    public static class ThreadLocalBuffer extends ThreadLocal<byte[]> {
        ThreadLocalBuffer() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public byte[] initialValue() {
            return new byte[1024];
        }
    }

    static HdfsConfiguration newConf(long j) throws IOException {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 1024L);
        hdfsConfiguration.setBoolean(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY, true);
        if (j > 0) {
            hdfsConfiguration.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, j);
        }
        return hdfsConfiguration;
    }

    static ExecutorService createExecutor() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(128);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        for (int i = 0; i < 128; i++) {
            executorCompletionService.submit(new Callable<Path>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBatchIbr.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Path call() throws Exception {
                    TestBatchIbr.IO_BUF.get();
                    TestBatchIbr.VERIFY_BUF.get();
                    return null;
                }
            });
        }
        for (int i2 = 0; i2 < 128; i2++) {
            executorCompletionService.take().get();
        }
        return newFixedThreadPool;
    }

    static void runIbrTest(long j) throws Exception {
        ExecutorService createExecutor = createExecutor();
        final Random random = new Random();
        HdfsConfiguration newConf = newConf(j);
        MiniDFSCluster build = new MiniDFSCluster.Builder(newConf).numDataNodes(4).build();
        final DistributedFileSystem fileSystem = build.getFileSystem();
        try {
            final Path path = new Path("/dir");
            fileSystem.mkdirs(path);
            long monotonicNow = Time.monotonicNow();
            ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(createExecutor);
            final AtomicLong atomicLong = new AtomicLong();
            final AtomicInteger atomicInteger = new AtomicInteger();
            for (int i = 0; i < 1000; i++) {
                executorCompletionService.submit(new Callable<Path>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBatchIbr.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Path call() throws Exception {
                        long monotonicNow2 = Time.monotonicNow();
                        try {
                            long nextLong = random.nextLong();
                            int nextInt = random.nextInt(8) + 1;
                            atomicInteger.addAndGet(nextInt);
                            Path createFile = TestBatchIbr.createFile(path, nextInt, nextLong, fileSystem);
                            atomicLong.addAndGet(Time.monotonicNow() - monotonicNow2);
                            return createFile;
                        } catch (Throwable th) {
                            atomicLong.addAndGet(Time.monotonicNow() - monotonicNow2);
                            throw th;
                        }
                    }
                });
            }
            ExecutorCompletionService executorCompletionService2 = new ExecutorCompletionService(createExecutor);
            final AtomicLong atomicLong2 = new AtomicLong();
            for (int i2 = 0; i2 < 1000; i2++) {
                final Path path2 = (Path) executorCompletionService.take().get();
                executorCompletionService2.submit(new Callable<Boolean>() { // from class: org.apache.hadoop.hdfs.server.datanode.TestBatchIbr.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Boolean call() throws Exception {
                        long monotonicNow2 = Time.monotonicNow();
                        try {
                            Boolean valueOf = Boolean.valueOf(TestBatchIbr.verifyFile(Path.this, fileSystem));
                            atomicLong2.addAndGet(Time.monotonicNow() - monotonicNow2);
                            return valueOf;
                        } catch (Throwable th) {
                            atomicLong2.addAndGet(Time.monotonicNow() - monotonicNow2);
                            throw th;
                        }
                    }
                });
            }
            for (int i3 = 0; i3 < 1000; i3++) {
                Assert.assertTrue(((Boolean) executorCompletionService2.take().get()).booleanValue());
            }
            long monotonicNow2 = Time.monotonicNow();
            LOG.info("ibrInterval=" + j + " (" + toConfString(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, newConf) + "), numBlockCreated=" + atomicInteger);
            LOG.info("duration=" + toSecondString(monotonicNow2 - monotonicNow) + ", createFileTime=" + toSecondString(atomicLong.get()) + ", verifyFileTime=" + toSecondString(atomicLong2.get()));
            LOG.info("NUM_FILES=1000, MAX_BLOCK_NUM=8, BLOCK_SIZE=1024, NUM_THREADS=128, NUM_DATANODES=4");
            logIbrCounts(build.getDataNodes());
            createExecutor.shutdown();
            build.shutdown();
        } catch (Throwable th) {
            createExecutor.shutdown();
            build.shutdown();
            throw th;
        }
    }

    static String toConfString(String str, Configuration configuration) {
        return str + "=" + configuration.get(str);
    }

    static String toSecondString(long j) {
        return (j / 1000.0d) + "s";
    }

    static void logIbrCounts(List<DataNode> list) {
        for (DataNode dataNode : list) {
            LOG.info(dataNode.getDisplayName() + ": IncrementalBlockReportsNumOps=" + MetricsAsserts.getLongCounter("IncrementalBlockReportsNumOps", MetricsAsserts.getMetrics(dataNode.getMetrics().name())));
        }
    }

    static byte[] nextBytes(int i, long j, byte[] bArr) {
        byte b = (byte) (j ^ (j >> i));
        for (int i2 = 0; i2 < bArr.length; i2++) {
            byte b2 = b;
            b = (byte) (b + 1);
            bArr[i2] = b2;
        }
        return bArr;
    }

    static Path createFile(Path path, int i, long j, DistributedFileSystem distributedFileSystem) throws IOException {
        Path path2 = new Path(path, j + "_" + i);
        byte[] bArr = IO_BUF.get();
        FSDataOutputStream create = distributedFileSystem.create(path2);
        Throwable th = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                try {
                    create.write(nextBytes(i2, j, bArr));
                } finally {
                }
            } catch (Throwable th2) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th2;
            }
        }
        if (create != null) {
            if (0 != 0) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                create.close();
            }
        }
        return path2;
    }

    static boolean verifyFile(Path path, DistributedFileSystem distributedFileSystem) {
        String name = path.getName();
        int indexOf = name.indexOf(95);
        long parseLong = Long.parseLong(name.substring(0, indexOf));
        int parseInt = Integer.parseInt(name.substring(indexOf + 1));
        byte[] bArr = IO_BUF.get();
        byte[] bArr2 = VERIFY_BUF.get();
        try {
            FSDataInputStream open = distributedFileSystem.open(path);
            Throwable th = null;
            for (int i = 0; i < parseInt; i++) {
                try {
                    try {
                        open.read(bArr);
                        nextBytes(i, parseLong, bArr2);
                        Assert.assertArrayEquals(bArr2, bArr);
                    } finally {
                    }
                } finally {
                }
            }
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    open.close();
                }
            }
            return true;
        } catch (Exception e) {
            LOG.error("Failed to verify file " + path);
            return false;
        }
    }

    @Test
    public void testIbr() throws Exception {
        runIbrTest(0L);
        runIbrTest(100L);
    }

    static {
        GenericTestUtils.setLogLevel(LogFactory.getLog(IncrementalBlockReportManager.class), Level.ALL);
    }
}
