package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.Test;

/* loaded from: input_file:lib/hadoop-hdfs-0.23.4-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush.class */
public class TestMultiThreadedHflush {
    static final int blockSize = 1048576;
    static final int numBlocks = 10;
    static final int fileSize = 10485761;
    private static final int NUM_THREADS = 10;
    private static final int WRITE_SIZE = 517;
    private static final int NUM_WRITES_PER_THREAD = 1000;
    private byte[] toWrite = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/hadoop-hdfs-0.23.4-tests.jar:org/apache/hadoop/hdfs/TestMultiThreadedHflush$WriterThread.class */
    public class WriterThread extends Thread {
        private final FSDataOutputStream stm;
        private final AtomicReference<Throwable> thrown;
        private final int numWrites;
        private final CountDownLatch countdown;

        public WriterThread(FSDataOutputStream fSDataOutputStream, AtomicReference<Throwable> atomicReference, CountDownLatch countDownLatch, int i) {
            this.stm = fSDataOutputStream;
            this.thrown = atomicReference;
            this.numWrites = i;
            this.countdown = countDownLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.countdown.await();
                for (int i = 0; i < this.numWrites && this.thrown.get() == null; i++) {
                    doAWrite();
                }
            } catch (Throwable th) {
                this.thrown.compareAndSet(null, th);
            }
        }

        private void doAWrite() throws IOException {
            this.stm.write(TestMultiThreadedHflush.this.toWrite);
            this.stm.hflush();
        }
    }

    public TestMultiThreadedHflush() {
        ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
        ((Log4JLogger) LeaseManager.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
        ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger) InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
    }

    private FSDataOutputStream createFile(FileSystem fileSystem, Path path, int i) throws IOException {
        return fileSystem.create(path, true, fileSystem.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096), (short) i, 1048576L);
    }

    private void initBuffer(int i) {
        this.toWrite = AppendTestUtil.randomBytes(AppendTestUtil.nextLong(), i);
    }

    @Test
    public void testMultipleHflushers() throws Exception {
        Configuration configuration = new Configuration();
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).build();
        FileSystem fileSystem = build.getFileSystem();
        try {
            doMultithreadedWrites(configuration, new Path("/multiple-hflushers.dat"), 10, WRITE_SIZE, 1000);
            fileSystem.close();
            build.shutdown();
        } catch (Throwable th) {
            fileSystem.close();
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testHflushWhileClosing() throws Throwable {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new Configuration()).build();
        FileSystem fileSystem = build.getFileSystem();
        final FSDataOutputStream createFile = createFile(fileSystem, new Path("/hflush-and-close.dat"), 1);
        ArrayList arrayList = new ArrayList();
        final AtomicReference atomicReference = new AtomicReference();
        for (int i = 0; i < 10; i++) {
            try {
                Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.TestMultiThreadedHflush.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        while (true) {
                            try {
                                try {
                                    createFile.hflush();
                                } catch (IOException e) {
                                    if (e.toString().contains("DFSOutputStream is closed")) {
                                        return;
                                    } else {
                                        throw e;
                                    }
                                }
                            } catch (Throwable th) {
                                atomicReference.set(th);
                                return;
                            }
                        }
                    }
                };
                thread.start();
                arrayList.add(thread);
            } finally {
                fileSystem.close();
                build.shutdown();
            }
        }
        for (int i2 = 0; i2 < 10000; i2++) {
            createFile.write(1);
        }
        createFile.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        if (atomicReference.get() != null) {
            throw ((Throwable) atomicReference.get());
        }
    }

    public void doMultithreadedWrites(Configuration configuration, Path path, int i, int i2, int i3) throws Exception {
        initBuffer(i2);
        FSDataOutputStream createFile = createFile(path.getFileSystem(configuration), path, 1);
        System.out.println("Created file simpleFlush.dat");
        createFile.hflush();
        createFile.hflush();
        createFile.write(1);
        createFile.hflush();
        createFile.hflush();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ArrayList arrayList = new ArrayList();
        AtomicReference atomicReference = new AtomicReference();
        for (int i4 = 0; i4 < i; i4++) {
            WriterThread writerThread = new WriterThread(createFile, atomicReference, countDownLatch, i3);
            arrayList.add(writerThread);
            writerThread.start();
        }
        countDownLatch.countDown();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        if (atomicReference.get() != null) {
            throw new RuntimeException("Deferred", (Throwable) atomicReference.get());
        }
        createFile.close();
        System.out.println("Closed file.");
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length != 1) {
            System.err.println(HelpFormatter.DEFAULT_SYNTAX_PREFIX + TestMultiThreadedHflush.class.getSimpleName() + " <path to test file> ");
            System.exit(1);
        }
        TestMultiThreadedHflush testMultiThreadedHflush = new TestMultiThreadedHflush();
        Configuration configuration = new Configuration();
        Path path = new Path(strArr[0]);
        long nanoTime = System.nanoTime();
        testMultiThreadedHflush.doMultithreadedWrites(configuration, path, 10, 511, 50000);
        System.out.println("Finished in " + ((System.nanoTime() - nanoTime) / 1000000) + "ms");
    }
}
