package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.flow.FailingFlowListener;
import cascading.flow.Flow;
import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.FlowStep;
import cascading.flow.Flows;
import cascading.flow.LockingFlowListener;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.FlowStepJob;
import cascading.operation.BaseOperation;
import cascading.operation.Debug;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.platform.hadoop.BaseHadoopPlatform;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextLine;
import cascading.stats.CascadingStats;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import cascading.util.Util;
import data.InputData;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/hadoop/FlowPlatformTest.class */
public class FlowPlatformTest extends PlatformTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(FlowPlatformTest.class);

    /* loaded from: input_file:cascading/flow/hadoop/FlowPlatformTest$BadFilter.class */
    private static class BadFilter extends BaseOperation implements Filter {
        private Object object;

        private BadFilter() {
            this.object = new Object();
        }

        public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
            return false;
        }
    }

    public FlowPlatformTest() {
        super(true);
    }

    @Test
    public void testLocalModeSource() throws Exception {
        List flowSteps = getPlatform().getFlowConnector().connect(new Lfs(new TextLine(), "input/path"), new Hfs(new TextLine(), "output/path", SinkMode.REPLACE), new Pipe("test")).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        assertTrue("is not local", HadoopUtil.isLocal((Configuration) ((FlowStep) flowSteps.get(0)).getConfig()));
    }

    @Test
    public void testLocalModeSink() throws Exception {
        List flowSteps = getPlatform().getFlowConnector().connect(new Hfs(new TextLine(), "input/path"), new Lfs(new TextLine(), "output/path", SinkMode.REPLACE), new Pipe("test")).getFlowSteps();
        assertEquals("wrong size", 1, flowSteps.size());
        assertTrue("is not local", HadoopUtil.isLocal((Configuration) ((FlowStep) flowSteps.get(0)).getConfig()));
    }

    @Test
    public void testNotLocalMode() throws Exception {
        if (getPlatform().isUseCluster()) {
            Flow connect = getPlatform().getFlowConnector().connect(new Hfs(new TextLine(), "input/path"), new Hfs(new TextLine(), "output/path", SinkMode.REPLACE), new Pipe("test"));
            List flowSteps = connect.getFlowSteps();
            assertEquals("wrong size", 1, flowSteps.size());
            assertTrue("is local", !HadoopUtil.isLocal((Configuration) ((FlowStep) flowSteps.get(0)).createInitializedConfig(connect.getFlowProcess(), ((BaseHadoopPlatform) getPlatform()).getConfiguration())));
        }
    }

    @Test
    public void testStop() throws Exception {
        if (getPlatform().isUseCluster()) {
            getPlatform().copyFromLocal(InputData.inputFileLower);
            getPlatform().copyFromLocal(InputData.inputFileUpper);
            Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
            Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
            HashMap hashMap = new HashMap();
            hashMap.put("lower", hfs);
            hashMap.put("upper", hfs2);
            RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
            Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, new Hfs(new TextLine(), getOutputPath("stopped"), SinkMode.REPLACE), new CoGroup(new GroupBy(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), Fields.size(4)));
            LockingFlowListener lockingFlowListener = new LockingFlowListener();
            connect.addListener(lockingFlowListener);
            LOG.info("calling start");
            connect.start();
            Util.safeSleep(5000L);
            assertTrue("did not start", lockingFlowListener.started.tryAcquire(60L, TimeUnit.SECONDS));
            while (true) {
                LOG.info("testing if running");
                Thread.sleep(1000L);
                Map jobsMap = Flows.getJobsMap(connect);
                if (jobsMap != null && jobsMap.values().size() != 0) {
                    FlowStepJob flowStepJob = (FlowStepJob) jobsMap.values().iterator().next();
                    if (flowStepJob.getStepStats().getStatus() == CascadingStats.Status.FAILED) {
                        fail("failed to start Hadoop step, please check your environment.");
                    }
                    if (flowStepJob.isStarted()) {
                        break;
                    }
                }
            }
            Semaphore semaphore = new Semaphore(0);
            long nanoTime = System.nanoTime();
            Future submit = Executors.newSingleThreadExecutor().submit(() -> {
                semaphore.release();
                LOG.info("calling complete");
                connect.complete();
                return Long.valueOf(System.nanoTime() - nanoTime);
            });
            semaphore.acquire();
            LOG.info("calling stop");
            connect.stop();
            long nanoTime2 = System.nanoTime() - nanoTime;
            long longValue = ((Long) submit.get()).longValue();
            assertTrue(String.format("stop: %s complete: %s", Long.valueOf(nanoTime2), Long.valueOf(longValue)), nanoTime2 <= longValue);
            assertTrue("did not stop", lockingFlowListener.stopped.tryAcquire(60L, TimeUnit.SECONDS));
            assertTrue("did not complete", lockingFlowListener.completed.tryAcquire(60L, TimeUnit.SECONDS));
        }
    }

    @Test
    public void testFailedSerialization() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        try {
            getPlatform().getFlowConnector(getProperties()).connect(hashMap, new Hfs(new TextLine(), getOutputPath("badserialization"), SinkMode.REPLACE), new GroupBy(new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"}), new BadFilter()), new Fields(new Comparable[]{"num"})));
            fail("did not throw serialization exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testStartStopRace() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, new Hfs(new TextLine(), getOutputPath("startstop"), SinkMode.REPLACE), new GroupBy(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})));
        connect.start();
        connect.stop();
    }

    @Test
    public void testFailingListenerStarting() throws Exception {
        failingListenerTest(FailingFlowListener.OnFail.STARTING);
    }

    @Test
    public void testFailingListenerStopping() throws Exception {
        failingListenerTest(FailingFlowListener.OnFail.STOPPING);
    }

    @Test
    public void testFailingListenerCompleted() throws Exception {
        failingListenerTest(FailingFlowListener.OnFail.COMPLETED);
    }

    @Test
    public void testFailingListenerThrowable() throws Exception {
        failingListenerTest(FailingFlowListener.OnFail.THROWABLE);
    }

    private void failingListenerTest(FailingFlowListener.OnFail onFail) throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs hfs3 = new Hfs(new TextLine(), getOutputPath(onFail + "/stopped"), SinkMode.REPLACE);
        Pipe each = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter);
        if (onFail == FailingFlowListener.OnFail.THROWABLE) {
            each = new Each(each, new Debug() { // from class: cascading.flow.hadoop.FlowPlatformTest.1
                public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
                    throw new RuntimeException("failing inside pipe assembly intentionally");
                }
            });
        }
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, hfs3, new CoGroup(new GroupBy(each, new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), new GroupBy(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"})), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        FailingFlowListener failingFlowListener = new FailingFlowListener(onFail);
        connect.addListener(failingFlowListener);
        LOG.info("calling start");
        connect.start();
        assertTrue("did not start", failingFlowListener.started.tryAcquire(120L, TimeUnit.SECONDS));
        if (onFail == FailingFlowListener.OnFail.STOPPING) {
            while (true) {
                LOG.info("testing if running");
                Thread.sleep(1000L);
                Map jobsMap = Flows.getJobsMap(connect);
                if (jobsMap != null && jobsMap.values().size() != 0) {
                    FlowStepJob flowStepJob = (FlowStepJob) jobsMap.values().iterator().next();
                    if (flowStepJob.getStepStats().getStatus() == CascadingStats.Status.FAILED) {
                        fail("failed to start Hadoop step, please check your environment.");
                    }
                    if (flowStepJob.isStarted()) {
                        break;
                    }
                }
            }
            LOG.info("calling stop");
            connect.stop();
        }
        assertTrue("did not complete", failingFlowListener.completed.tryAcquire(360L, TimeUnit.SECONDS));
        assertTrue("did not stop", failingFlowListener.stopped.tryAcquire(360L, TimeUnit.SECONDS));
        try {
            connect.complete();
            fail("did not rethrow exception from listener");
        } catch (Exception e) {
        }
    }

    @Test
    public void testFlowID() throws Exception {
        Lfs lfs = new Lfs(new TextLine(), "input/path");
        Hfs hfs = new Hfs(new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Map properties = getProperties();
        Flow connect = getPlatform().getFlowConnector(properties).connect(lfs, hfs, pipe);
        assertNotNull("missing id", connect.getID());
        assertNotNull("missing id in conf", connect.getProperty("cascading.flow.id"));
        assertTrue("same id", !connect.getID().equalsIgnoreCase(getPlatform().getFlowConnector(properties).connect(lfs, hfs, pipe).getID()));
    }

    @Test
    public void testCopyConfig() throws Exception {
        Lfs lfs = new Lfs(new TextLine(), "input/path");
        Hfs hfs = new Hfs(new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Configuration configuration = ((BaseHadoopPlatform) getPlatform()).getConfiguration();
        configuration.set("cascading.app.name", "testname");
        Flow connect = getPlatform().getFlowConnector(AppProps.appProps().setVersion("1.2.3").buildProperties(configuration)).connect(lfs, hfs, pipe);
        assertEquals("testname", connect.getProperty("cascading.app.name"));
        assertEquals("1.2.3", connect.getProperty("cascading.app.version"));
    }

    @Test
    public void testStartWithoutComplete() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, new Hfs(new TextLine(), getOutputPath("withoutcomplete"), SinkMode.REPLACE), new GroupBy(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ")), new Fields(new Comparable[]{"num"})));
        LockingFlowListener lockingFlowListener = new LockingFlowListener();
        connect.addListener(lockingFlowListener);
        connect.start();
        assertTrue(lockingFlowListener.completed.tryAcquire(90L, TimeUnit.SECONDS));
    }

    @Test
    public void testFailOnMissingSuccessFlowListener() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        FailOnMissingSuccessFlowListener failOnMissingSuccessFlowListener = new FailOnMissingSuccessFlowListener();
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs hfs2 = new Hfs(new TextLine(), getOutputPath("withsuccess"), SinkMode.REPLACE);
        Hfs hfs3 = new Hfs(new TextLine(), getOutputPath("withoutsuccess"), SinkMode.REPLACE);
        Hfs hfs4 = new Hfs(new TextLine(), getOutputPath("final"), SinkMode.REPLACE);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hfs, hfs2, new Pipe("lower"));
        connect.addListener(failOnMissingSuccessFlowListener);
        connect.complete();
        Flow connect2 = getPlatform().getFlowConnector(getProperties()).connect(hfs2, hfs3, new Pipe("lower"));
        connect2.addListener(failOnMissingSuccessFlowListener);
        connect2.complete();
        assertTrue(new Hfs(new TextLine(), new Path(hfs3.getPath(), "_SUCCESS").toString()).deleteResource(getPlatform().getFlowProcess()));
        Flow connect3 = getPlatform().getFlowConnector(getProperties()).connect(hfs3, hfs4, new Pipe("lower"));
        connect3.addListener(failOnMissingSuccessFlowListener);
        try {
            connect3.complete();
            fail("listener did not fail flow");
        } catch (FlowException e) {
        }
    }
}
