package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Priority;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-hdfs-2.7.6/share/hadoop/hdfs/hadoop-hdfs-2.7.6-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush.class
  input_file:test-classes/org/apache/hadoop/hdfs/TestMultiThreadedHflush.class
 */
/* loaded from: input_file:hadoop-hdfs-2.7.6-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush.class */
public class TestMultiThreadedHflush {
    static final int blockSize = 1048576;
    private static final int NUM_THREADS = 10;
    private static final int WRITE_SIZE = 517;
    private static final int NUM_WRITES_PER_THREAD = 1000;
    private byte[] toWrite = null;
    private final SampleQuantiles quantiles = new SampleQuantiles(new Quantile[]{new Quantile(0.5d, 0.05d), new Quantile(0.75d, 0.025d), new Quantile(0.9d, 0.01d), new Quantile(0.95d, 0.005d), new Quantile(0.99d, 0.001d)});

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-hdfs-2.7.6/share/hadoop/hdfs/hadoop-hdfs-2.7.6-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush$CLIBenchmark.class
      input_file:test-classes/org/apache/hadoop/hdfs/TestMultiThreadedHflush$CLIBenchmark.class
     */
    /* loaded from: input_file:hadoop-hdfs-2.7.6-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush$CLIBenchmark.class */
    private static class CLIBenchmark extends Configured implements Tool {
        private CLIBenchmark() {
        }

        public int run(String[] strArr) throws Exception {
            if (strArr.length != 1) {
                System.err.println(HelpFormatter.DEFAULT_SYNTAX_PREFIX + TestMultiThreadedHflush.class.getSimpleName() + " <path to test file> ");
                System.err.println("Configurations settable by -D options:\n  num.threads [default 10] - how many threads to run\n  write.size [default 511] - bytes per write\n  num.writes [default 50000] - how many writes to perform");
                System.exit(1);
            }
            TestMultiThreadedHflush testMultiThreadedHflush = new TestMultiThreadedHflush();
            Configuration conf = getConf();
            Path path = new Path(strArr[0]);
            int i = conf.getInt("num.threads", 10);
            int i2 = conf.getInt("write.size", 511);
            int i3 = conf.getInt("num.writes", Priority.FATAL_INT);
            int i4 = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
            StopWatch start = new StopWatch().start();
            testMultiThreadedHflush.doMultithreadedWrites(conf, path, i, i2, i3, i4);
            start.stop();
            System.out.println("Finished in " + start.now(TimeUnit.MILLISECONDS) + "ms");
            System.out.println("Latency quantiles (in microseconds):\n" + testMultiThreadedHflush.quantiles);
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-hdfs-2.7.6/share/hadoop/hdfs/hadoop-hdfs-2.7.6-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush$WriterThread.class
      input_file:test-classes/org/apache/hadoop/hdfs/TestMultiThreadedHflush$WriterThread.class
     */
    /* loaded from: input_file:hadoop-hdfs-2.7.6-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush$WriterThread.class */
    public class WriterThread extends Thread {
        private final FSDataOutputStream stm;
        private final AtomicReference<Throwable> thrown;
        private final int numWrites;
        private final CountDownLatch countdown;

        public WriterThread(FSDataOutputStream fSDataOutputStream, AtomicReference<Throwable> atomicReference, CountDownLatch countDownLatch, int i) {
            this.stm = fSDataOutputStream;
            this.thrown = atomicReference;
            this.numWrites = i;
            this.countdown = countDownLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.countdown.await();
                for (int i = 0; i < this.numWrites && this.thrown.get() == null; i++) {
                    doAWrite();
                }
            } catch (Throwable th) {
                this.thrown.compareAndSet(null, th);
            }
        }

        private void doAWrite() throws IOException {
            StopWatch start = new StopWatch().start();
            this.stm.write(TestMultiThreadedHflush.this.toWrite);
            this.stm.hflush();
            TestMultiThreadedHflush.this.quantiles.insert(start.now(TimeUnit.MICROSECONDS));
        }
    }

    private FSDataOutputStream createFile(FileSystem fileSystem, Path path, int i) throws IOException {
        return fileSystem.create(path, true, fileSystem.getConf().getInt("io.file.buffer.size", 4096), (short) i, 1048576L);
    }

    private void initBuffer(int i) {
        this.toWrite = AppendTestUtil.randomBytes(AppendTestUtil.nextLong(), i);
    }

    @Test
    public void testMultipleHflushersRepl1() throws Exception {
        doTestMultipleHflushers(1);
    }

    @Test
    public void testMultipleHflushersRepl3() throws Exception {
        doTestMultipleHflushers(3);
    }

    private void doTestMultipleHflushers(int i) throws Exception {
        Configuration configuration = new Configuration();
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(i).build();
        DistributedFileSystem fileSystem = build.getFileSystem();
        try {
            doMultithreadedWrites(configuration, new Path("/multiple-hflushers.dat"), 10, WRITE_SIZE, 1000, i);
            System.out.println("Latency quantiles (in microseconds):\n" + this.quantiles);
            fileSystem.close();
            build.shutdown();
        } catch (Throwable th) {
            fileSystem.close();
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testHflushWhileClosing() throws Throwable {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new Configuration()).build();
        DistributedFileSystem fileSystem = build.getFileSystem();
        final FSDataOutputStream createFile = createFile(fileSystem, new Path("/hflush-and-close.dat"), 1);
        ArrayList arrayList = new ArrayList();
        final AtomicReference atomicReference = new AtomicReference();
        for (int i = 0; i < 10; i++) {
            try {
                Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.TestMultiThreadedHflush.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        while (true) {
                            try {
                                createFile.hflush();
                            } catch (ClosedChannelException e) {
                                return;
                            } catch (Throwable th) {
                                atomicReference.set(th);
                                return;
                            }
                        }
                    }
                };
                thread.start();
                arrayList.add(thread);
            } finally {
                fileSystem.close();
                build.shutdown();
            }
        }
        for (int i2 = 0; i2 < 10000; i2++) {
            createFile.write(1);
        }
        createFile.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        if (atomicReference.get() != null) {
            throw ((Throwable) atomicReference.get());
        }
    }

    public void doMultithreadedWrites(Configuration configuration, Path path, int i, int i2, int i3, int i4) throws Exception {
        initBuffer(i2);
        FSDataOutputStream createFile = createFile(path.getFileSystem(configuration), path, i4);
        System.out.println("Created file simpleFlush.dat");
        createFile.hflush();
        createFile.hflush();
        createFile.write(1);
        createFile.hflush();
        createFile.hflush();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ArrayList arrayList = new ArrayList();
        AtomicReference atomicReference = new AtomicReference();
        for (int i5 = 0; i5 < i; i5++) {
            WriterThread writerThread = new WriterThread(createFile, atomicReference, countDownLatch, i3);
            arrayList.add(writerThread);
            writerThread.start();
        }
        countDownLatch.countDown();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        if (atomicReference.get() != null) {
            throw new RuntimeException("Deferred", (Throwable) atomicReference.get());
        }
        createFile.close();
        System.out.println("Closed file.");
    }

    public static void main(String[] strArr) throws Exception {
        System.exit(ToolRunner.run(new CLIBenchmark(), strArr));
    }
}
