package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.flow.hadoop.planner.HadoopPlanner;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.util.Util;
import data.InputData;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.junit.Test;

/* loaded from: input_file:cascading/flow/hadoop/MultiMapReduceFlowPlatformTest.class */
public class MultiMapReduceFlowPlatformTest extends PlatformTestCase {
    public MultiMapReduceFlowPlatformTest() {
        super(true);
    }

    @Test
    public void testFlow() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        String outputPath = getOutputPath("flowTest1");
        String outputPath2 = getOutputPath("flowTest2");
        String outputPath3 = getOutputPath("flowTest3");
        remove(outputPath, true);
        remove(outputPath2, true);
        remove(outputPath3, true);
        JobConf jobConf = (JobConf) getPlatform().getConfiguration();
        MultiMapReduceFlow multiMapReduceFlow = new MultiMapReduceFlow("mrflow", createJob(jobConf, "mr1", InputData.inputFileApache, outputPath), new JobConf[]{createJob(jobConf, "mr2", outputPath, outputPath2), createJob(jobConf, "mr3", outputPath2, outputPath3)});
        validateLength(new Hfs(new TextLine(), InputData.inputFileApache).openForRead(new HadoopFlowProcess(jobConf)), 10);
        multiMapReduceFlow.complete();
        validateLength(new Hfs(new TextLine(), outputPath).openForRead(new HadoopFlowProcess(jobConf)), 10);
        Collection values = multiMapReduceFlow.getSinks().values();
        assertEquals(1, values.size());
        String identifier = ((Tap) values.iterator().next()).getIdentifier();
        assertEquals("flowTest3", identifier.substring(identifier.lastIndexOf(47) + 1));
    }

    @Test
    public void testFlowLazy() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        String outputPath = getOutputPath("flowTest1");
        String outputPath2 = getOutputPath("flowTest2");
        String outputPath3 = getOutputPath("flowTest3");
        remove(outputPath, true);
        remove(outputPath2, true);
        remove(outputPath3, true);
        JobConf jobConf = (JobConf) getPlatform().getConfiguration();
        JobConf createJob = createJob(jobConf, "mr1", InputData.inputFileApache, outputPath);
        JobConf createJob2 = createJob(jobConf, "mr2", outputPath, outputPath2);
        JobConf createJob3 = createJob(jobConf, "mr3", outputPath2, outputPath3);
        validateLength(new Hfs(new TextLine(), InputData.inputFileApache).openForRead(new HadoopFlowProcess(jobConf)), 10);
        MultiMapReduceFlow multiMapReduceFlow = new MultiMapReduceFlow("mrflow", createJob, new JobConf[0]);
        multiMapReduceFlow.start();
        Util.safeSleep(3000L);
        multiMapReduceFlow.attachFlowStep(createJob2);
        Util.safeSleep(3000L);
        multiMapReduceFlow.attachFlowStep(createJob3);
        multiMapReduceFlow.complete();
        validateLength(new Hfs(new TextLine(), outputPath).openForRead(new HadoopFlowProcess(jobConf)), 10);
        Collection values = multiMapReduceFlow.getSinks().values();
        assertEquals(1, values.size());
        String identifier = ((Tap) values.iterator().next()).getIdentifier();
        assertEquals("flowTest3", identifier.substring(identifier.lastIndexOf(47) + 1));
    }

    @Test(expected = IllegalStateException.class)
    public void testFlowLazyFail() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        String outputPath = getOutputPath("flowTest1");
        String outputPath2 = getOutputPath("flowTest2");
        remove(outputPath, true);
        remove(outputPath2, true);
        JobConf jobConf = (JobConf) getPlatform().getConfiguration();
        JobConf createJob = createJob(jobConf, "mr1", InputData.inputFileApache, outputPath);
        JobConf createJob2 = createJob(jobConf, "mr2", outputPath, outputPath2);
        validateLength(new Hfs(new TextLine(), InputData.inputFileApache).openForRead(new HadoopFlowProcess(jobConf)), 10);
        MultiMapReduceFlow multiMapReduceFlow = new MultiMapReduceFlow("mrflow", createJob, new JobConf[0]);
        multiMapReduceFlow.complete();
        multiMapReduceFlow.attachFlowStep(createJob2);
    }

    protected JobConf createJob(JobConf jobConf, String str, String str2, String str3) {
        JobConf jobConf2 = new JobConf(jobConf);
        jobConf2.setJobName(str);
        jobConf2.setOutputKeyClass(LongWritable.class);
        jobConf2.setOutputValueClass(Text.class);
        jobConf2.setMapperClass(IdentityMapper.class);
        jobConf2.setReducerClass(IdentityReducer.class);
        jobConf2.setInputFormat(TextInputFormat.class);
        jobConf2.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(jobConf2, new Path[]{new Path(str2)});
        FileOutputFormat.setOutputPath(jobConf2, new Path(str3));
        return jobConf2;
    }

    private String remove(String str, boolean z) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(str), HadoopPlanner.createJobConf(getProperties()));
        if (z) {
            fileSystem.delete(new Path(str), true);
        }
        return str;
    }
}
