package org.apache.hadoop.mapred;

import com.facebook.presto.phoenix.shaded.junit.framework.TestCase;
import com.facebook.presto.phoenix.shaded.org.apache.commons.io.IOUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.MapOutputCollector;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.MRJobConfig;

/* loaded from: input_file:org/apache/hadoop/mapred/TestMerge.class */
public class TestMerge extends TestCase {
    private static final int NUM_HADOOP_DATA_NODES = 2;
    private static final int NUM_MAPPERS = 10;
    private static final int NUM_REDUCERS = 4;
    private static final int NUM_LINES = 1000;
    private static final Path INPUT_DIR = new Path("/testplugin/input");
    private static final Path OUTPUT = new Path("/testplugin/output");

    /* loaded from: input_file:org/apache/hadoop/mapred/TestMerge$KeyValueWriter.class */
    static class KeyValueWriter<K, V> {
        private Class<K> keyClass;
        private Class<V> valueClass;
        private DataOutputBuffer dataBuffer = new DataOutputBuffer();
        private Serializer<K> keySerializer;
        private Serializer<V> valueSerializer;
        private DataOutputStream outputStream;

        public KeyValueWriter(Configuration configuration, OutputStream outputStream, Class<K> cls, Class<V> cls2) throws IOException {
            this.keyClass = cls;
            this.valueClass = cls2;
            SerializationFactory serializationFactory = new SerializationFactory(configuration);
            this.keySerializer = serializationFactory.getSerializer(this.keyClass);
            this.keySerializer.open(this.dataBuffer);
            this.valueSerializer = serializationFactory.getSerializer(this.valueClass);
            this.valueSerializer.open(this.dataBuffer);
            this.outputStream = new DataOutputStream(outputStream);
        }

        public void write(K k, V v) throws IOException {
            if (k.getClass() != this.keyClass) {
                throw new IOException("wrong key class: " + k.getClass() + " is not " + this.keyClass);
            }
            if (v.getClass() != this.valueClass) {
                throw new IOException("wrong value class: " + v.getClass() + " is not " + this.valueClass);
            }
            this.keySerializer.serialize(k);
            int length = this.dataBuffer.getLength();
            if (length < 0) {
                throw new IOException("Negative key-length not allowed: " + length + " for " + k);
            }
            this.valueSerializer.serialize(v);
            int length2 = this.dataBuffer.getLength() - length;
            if (length2 < 0) {
                throw new IOException("Negative value-length not allowed: " + length2 + " for " + v);
            }
            WritableUtils.writeVInt(this.outputStream, length);
            WritableUtils.writeVInt(this.outputStream, length2);
            this.outputStream.write(this.dataBuffer.getData(), 0, this.dataBuffer.getLength());
            this.dataBuffer.reset();
        }

        public void close() throws IOException {
            this.keySerializer.close();
            this.valueSerializer.close();
            WritableUtils.writeVInt(this.outputStream, -1);
            WritableUtils.writeVInt(this.outputStream, -1);
            this.outputStream.close();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestMerge$MapOutputCopier.class */
    static class MapOutputCopier<K, V> implements MapOutputCollector<K, V> {
        private static final int BUF_SIZE = 131072;
        private MapTask mapTask;
        private JobConf jobConf;
        private Task.TaskReporter reporter;
        private int numberOfPartitions;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private KeyValueWriter<K, V>[] recordWriters;
        private ByteArrayOutputStream[] outStreams;

        @Override // org.apache.hadoop.mapred.MapOutputCollector
        public void init(MapOutputCollector.Context context) throws IOException, ClassNotFoundException {
            this.mapTask = context.getMapTask();
            this.jobConf = context.getJobConf();
            this.reporter = context.getReporter();
            this.numberOfPartitions = this.jobConf.getNumReduceTasks();
            this.keyClass = (Class<K>) this.jobConf.getMapOutputKeyClass();
            this.valueClass = (Class<V>) this.jobConf.getMapOutputValueClass();
            this.recordWriters = new KeyValueWriter[this.numberOfPartitions];
            this.outStreams = new ByteArrayOutputStream[this.numberOfPartitions];
            for (int i = 0; i < this.numberOfPartitions; i++) {
                this.outStreams[i] = new ByteArrayOutputStream();
                this.recordWriters[i] = new KeyValueWriter<>(this.jobConf, this.outStreams[i], this.keyClass, this.valueClass);
            }
        }

        @Override // org.apache.hadoop.mapred.MapOutputCollector
        public synchronized void collect(K k, V v, int i) throws IOException, InterruptedException {
            if (i < 0 || i >= this.numberOfPartitions) {
                throw new IOException("Invalid partition number: " + i);
            }
            this.recordWriters[i].write(k, v);
            this.reporter.progress();
        }

        @Override // org.apache.hadoop.mapred.MapOutputCollector
        public void close() throws IOException, InterruptedException {
            long j = 0;
            for (int i = 0; i < this.numberOfPartitions; i++) {
                this.recordWriters[i].close();
                this.outStreams[i].close();
                j += this.outStreams[i].size();
            }
            MapOutputFile mapOutputFile = this.mapTask.getMapOutputFile();
            Path outputFileForWrite = mapOutputFile.getOutputFileForWrite(j);
            int i2 = this.numberOfPartitions;
            MapTask mapTask = this.mapTask;
            copyPartitions(outputFileForWrite, mapOutputFile.getOutputIndexFileForWrite(i2 * 24));
        }

        @Override // org.apache.hadoop.mapred.MapOutputCollector
        public void flush() throws IOException, InterruptedException, ClassNotFoundException {
        }

        private void copyPartitions(Path path, Path path2) throws IOException {
            FSDataOutputStream create = FileSystem.getLocal(this.jobConf).getRaw().create(path, true, 131072);
            SpillRecord spillRecord = new SpillRecord(this.numberOfPartitions);
            IndexRecord indexRecord = new IndexRecord();
            for (int i = 0; i < this.numberOfPartitions; i++) {
                indexRecord.startOffset = create.getPos();
                byte[] byteArray = this.outStreams[i].toByteArray();
                IFileOutputStream iFileOutputStream = new IFileOutputStream(create);
                iFileOutputStream.write(byteArray);
                iFileOutputStream.finish();
                indexRecord.rawLength = byteArray.length;
                indexRecord.partLength = create.getPos() - indexRecord.startOffset;
                spillRecord.putIndex(indexRecord, i);
                this.reporter.progress();
            }
            create.close();
            spillRecord.writeToFile(path2, this.jobConf);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestMerge$MyMapper.class */
    public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
        private Text keyText = new Text();
        private Text valueText = new Text();

        @Override // org.apache.hadoop.mapred.Mapper
        public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
            String text2 = text.toString();
            int indexOf = text2.indexOf(" ");
            this.keyText.set(text2.substring(0, indexOf));
            this.valueText.set(text2.substring(indexOf + 1));
            outputCollector.collect(this.keyText, this.valueText);
        }

        @Override // org.apache.hadoop.mapred.MapReduceBase, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestMerge$MyPartitioner.class */
    static class MyPartitioner implements Partitioner<Text, Text> {
        @Override // org.apache.hadoop.mapred.JobConfigurable
        public void configure(JobConf jobConf) {
        }

        @Override // org.apache.hadoop.mapred.Partitioner
        public int getPartition(Text text, Text text2, int i) {
            int i2;
            try {
                i2 = Integer.parseInt(text.toString());
            } catch (NumberFormatException e) {
                i2 = 0;
            }
            return (i * Math.max(0, i2 - 1)) / 1000;
        }
    }

    public void testMerge() throws Exception {
        MiniDFSCluster miniDFSCluster = null;
        MiniMRClientCluster miniMRClientCluster = null;
        try {
            Configuration configuration = new Configuration();
            miniDFSCluster = new MiniDFSCluster.Builder(configuration).numDataNodes(2).build();
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            miniMRClientCluster = MiniMRClientClusterFactory.create(getClass(), 2, configuration);
            createInput(fileSystem);
            runMergeTest(new JobConf(miniMRClientCluster.getConfig()), fileSystem);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            if (miniMRClientCluster != null) {
                miniMRClientCluster.stop();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            if (miniMRClientCluster != null) {
                miniMRClientCluster.stop();
            }
            throw th;
        }
    }

    private void createInput(FileSystem fileSystem) throws Exception {
        fileSystem.delete(INPUT_DIR, true);
        for (int i = 0; i < 10; i++) {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(INPUT_DIR, "input_" + i + ".txt")));
            for (int i2 = 0; i2 < 1000; i2++) {
                String format = String.format("%09d", Integer.valueOf(i2 + 1));
                outputStreamWriter.write(format + " " + format + IOUtils.LINE_SEPARATOR_UNIX);
            }
            outputStreamWriter.close();
        }
    }

    private void runMergeTest(JobConf jobConf, FileSystem fileSystem) throws Exception {
        fileSystem.delete(OUTPUT, true);
        jobConf.setJobName("MergeTest");
        JobClient jobClient = new JobClient(jobConf);
        FileInputFormat.setInputPaths(jobConf, INPUT_DIR);
        FileOutputFormat.setOutputPath(jobConf, OUTPUT);
        jobConf.set("mapreduce.output.textoutputformat.separator", " ");
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setMapOutputKeyClass(Text.class);
        jobConf.setMapOutputValueClass(Text.class);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setMapperClass(MyMapper.class);
        jobConf.setPartitionerClass(MyPartitioner.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        jobConf.setNumReduceTasks(4);
        jobConf.set(MRJobConfig.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputCopier.class.getName());
        try {
            try {
                RunningJob submitJob = jobClient.submitJob(jobConf);
                try {
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (!jobClient.monitorAndPrintJob(jobConf, submitJob)) {
                    throw new IOException("Job failed!");
                }
                verifyOutput(submitJob, fileSystem);
            } catch (IOException e2) {
                System.err.println("Job failed with: " + e2);
                verifyOutput(null, fileSystem);
            }
        } catch (Throwable th) {
            verifyOutput(null, fileSystem);
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    private void verifyOutput(RunningJob runningJob, FileSystem fileSystem) throws Exception {
        FSDataInputStream fSDataInputStream = null;
        long j = 0;
        long j2 = 0;
        String str = "000000000";
        for (Path path : FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, new Utils.OutputFileUtils.OutputFilesFilter()))) {
            try {
                fSDataInputStream = fileSystem.open(path);
                while (true) {
                    String readLine = fSDataInputStream.readLine();
                    if (readLine == null) {
                        break;
                    }
                    int indexOf = readLine.indexOf(" ");
                    String substring = readLine.substring(0, indexOf);
                    String substring2 = readLine.substring(indexOf + 1);
                    if (substring.compareTo(str) < 0 || !substring.equals(substring2)) {
                        j2++;
                    } else {
                        str = substring;
                        j++;
                    }
                }
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                    fSDataInputStream = null;
                }
            } catch (Throwable th) {
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                throw th;
            }
        }
        assertEquals(10000L, j);
        assertEquals(0L, j2);
    }
}
