package cascading.tap.hadoop;

import cascading.PlatformTestCase;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowProcess;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.platform.hadoop.BaseHadoopPlatform;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.scheme.hadoop.SequenceFile;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.MultiSourceTap;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.partition.DelimitedPartition;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/tap/hadoop/HadoopTapPlatformTest.class */
public class HadoopTapPlatformTest extends PlatformTestCase implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopTapPlatformTest.class);

    /* loaded from: input_file:cascading/tap/hadoop/HadoopTapPlatformTest$CommentScheme.class */
    public class CommentScheme extends TextLine {
        public CommentScheme(Fields fields) {
            super(fields);
        }

        public boolean source(FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
            if (!((RecordReader) sourceCall.getInput()).next(((Object[]) sourceCall.getContext())[0], ((Object[]) sourceCall.getContext())[1])) {
                return false;
            }
            if (((Object[]) sourceCall.getContext())[1].toString().matches("^\\s*#.*$")) {
                return source(flowProcess, sourceCall);
            }
            sourceHandleInput(sourceCall);
            return true;
        }
    }

    /* loaded from: input_file:cascading/tap/hadoop/HadoopTapPlatformTest$DupeConfigScheme.class */
    public class DupeConfigScheme extends TextLine {
        public DupeConfigScheme(Fields fields) {
            super(fields);
        }

        public void sourceConfInit(FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration configuration) {
            if (configuration.get("this.is.a.dupe") != null) {
                throw new IllegalStateException("has dupe config value");
            }
            configuration.set("this.is.a.dupe", "dupe");
            super.sourceConfInit(flowProcess, tap, configuration);
        }

        public void sourcePrepare(FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
            if (flowProcess.getStringProperty("this.is.a.dupe") == null) {
                throw new IllegalStateException("has no dupe config value");
            }
            super.sourcePrepare(flowProcess, sourceCall);
        }

        public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
            sourceConfInit((FlowProcess<? extends Configuration>) flowProcess, (Tap<Configuration, RecordReader, OutputCollector>) tap, (Configuration) obj);
        }
    }

    /* loaded from: input_file:cascading/tap/hadoop/HadoopTapPlatformTest$ResolvedScheme.class */
    public class ResolvedScheme extends TextLine {
        private final Fields expectedFields;

        public ResolvedScheme(Fields fields) {
            this.expectedFields = fields;
        }

        public void sinkPrepare(FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
            Fields fields = sinkCall.getOutgoingEntry().getFields();
            if (!fields.equals(this.expectedFields)) {
                throw new RuntimeException("fields to not match, expect: " + this.expectedFields + ", found: " + fields);
            }
            super.sinkPrepare(flowProcess, sinkCall);
        }

        public void sink(FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
            Fields fields = sinkCall.getOutgoingEntry().getFields();
            if (!fields.equals(this.expectedFields)) {
                throw new RuntimeException("fields to not match, expect: " + this.expectedFields + ", found: " + fields);
            }
            super.sink(flowProcess, sinkCall);
        }
    }

    public HadoopTapPlatformTest() {
        super(true);
    }

    @Test
    public void testDfs() throws URISyntaxException, IOException {
        if (getPlatform().isUseCluster()) {
            if (!((BaseHadoopPlatform) getPlatform()).isHDFSAvailable()) {
                LOG.warn("skipped Dfs tests, HDFS is unavailable on current platform");
                return;
            }
            assertFalse("wrong scheme", new Path(new Dfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), "some/path").getFullIdentifier(getPlatform().getFlowProcess())).toUri().getScheme().equalsIgnoreCase("file"));
            new Dfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), "hdfs://localhost:5001/some/path");
            new Dfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), new URI("hdfs://localhost:5001/some/path"));
            try {
                new Dfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), "s3://localhost:5001/some/path");
                fail("not valid url");
            } catch (Exception e) {
            }
            try {
                new Dfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), new URI("s3://localhost:5001/some/path"));
                fail("not valid url");
            } catch (Exception e2) {
            }
        }
    }

    @Test
    public void testLfs() throws URISyntaxException, IOException {
        assertTrue("wrong scheme", new Path(new Lfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), "some/path").getFullIdentifier(getPlatform().getFlowProcess())).toUri().getScheme().equalsIgnoreCase("file"));
        new Lfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), "file:///some/path");
        try {
            new Lfs(new SequenceFile(new Fields(new Comparable[]{"foo"})), "s3://localhost:5001/some/path");
            fail("not valid url");
        } catch (Exception e) {
        }
    }

    @Test
    public void testNullsFromScheme() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileComments);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(new Hfs(new CommentScheme(new Fields(new Comparable[]{"line"})), InputData.inputFileComments), new Hfs(new TextLine(1), getOutputPath("testnulls"), SinkMode.REPLACE), new Each(new Pipe("test"), new Identity()));
        connect.complete();
        validateLength(connect, 5, null);
        TupleEntryIterator openSink = connect.openSink();
        assertEquals("not equal: tuple.get(1)", "1 a", ((TupleEntry) openSink.next()).getObject(1));
        openSink.close();
        validateLength(connect.openSource(), 5);
    }

    @Test
    public void testResolvedSinkFields() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"line"})), InputData.inputFileLower);
        Each each = new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " "));
        Hfs hfs2 = new Hfs(new ResolvedScheme(new Fields(new Comparable[]{"num", "char"})), getOutputPath("resolvedfields"), SinkMode.REPLACE);
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hfs, hfs2, each);
        connect.complete();
        List asList = asList(connect, hfs2);
        ArrayList arrayList = new ArrayList();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            arrayList.add(((Tuple) it.next()).getObject(1));
        }
        assertTrue(arrayList.contains("1\ta"));
        assertTrue(arrayList.contains("2\tb"));
        assertTrue(arrayList.contains("3\tc"));
        assertTrue(arrayList.contains("4\td"));
        assertTrue(arrayList.contains("5\te"));
        assertEquals(5, asList.size());
        assertEquals(5, asList(connect, hfs).size());
    }

    @Test
    public void testGlobHfs() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        GlobHfs globHfs = new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r],owe?}.txt");
        assertEquals(2, globHfs.getTaps().length);
        assertEquals(1, new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "../?ata/").getTaps().length);
        Hfs hfs = new Hfs(new TextLine(), getOutputPath("glob"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("concat"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), "\\s"));
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect("first", globHfs, hfs, each);
        new CascadeConnector(getProperties()).connect(new Flow[]{connect, getPlatform().getFlowConnector(getProperties()).connect("second", hfs, new Hfs(new TextLine(), getOutputPath("glob2"), SinkMode.REPLACE), each)}).complete();
        validateLength(connect, 10);
    }

    @Test
    public void testNestedMultiSourceGlobHfs() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        MultiSourceTap multiSourceTap = new MultiSourceTap(new Tap[]{new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r]}.txt"), new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{owe?}.txt")});
        assertEquals(2L, multiSourceTap.getNumChildTaps());
        Hfs hfs = new Hfs(new TextLine(), getOutputPath("globmultisource"), SinkMode.REPLACE);
        Each each = new Each(new Pipe("concat"), new Fields(new Comparable[]{"line"}), new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), "\\s"));
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect("first", multiSourceTap, hfs, each);
        new CascadeConnector(getProperties()).connect(new Flow[]{connect, getPlatform().getFlowConnector(getProperties()).connect("second", hfs, new Hfs(new TextLine(), getOutputPath("globmultiource2"), SinkMode.REPLACE), each)}).complete();
        validateLength(connect, 10);
    }

    @Test
    public void testMultiSourceIterator() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        validateLength(new MultiSourceTap(new Tap[]{new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r]}.txt"), new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{owe?}.txt")}).openForRead(getPlatform().getFlowProcess()), 10);
        validateLength(new MultiSourceTap(new Tap[]{new GlobHfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "?{ppe[_r],owe?}.txt")}).openForRead(getPlatform().getFlowProcess()), 10, null);
    }

    @Test
    public void testCommitResource() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        final int[] iArr = {0};
        Flow connect = getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), new Hfs(new TextDelimited(Fields.ALL), getOutputPath("committap"), SinkMode.REPLACE) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.1
            public boolean commitResource(Configuration configuration) throws IOException {
                iArr[0] = iArr[0] + 1;
                return true;
            }
        }, new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})));
        connect.complete();
        assertEquals(1, iArr[0]);
        validateLength(connect, 8, null);
    }

    @Test
    public void testCommitResourceFails() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        try {
            getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), new Hfs(new TextDelimited(Fields.ALL), getOutputPath("committapfail"), SinkMode.REPLACE) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.2
                public boolean commitResource(Configuration configuration) throws IOException {
                    throw new IOException("failed intentionally");
                }
            }, new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}))).complete();
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testHfsAsterisk() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "*");
        assertTrue(hfs.resourceExists(getPlatform().getFlowProcess()));
        TupleEntryIterator openForRead = hfs.openForRead(getPlatform().getFlowProcess());
        assertTrue(openForRead.hasNext());
        openForRead.close();
        try {
            new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "/blah/").openForRead(getPlatform().getFlowProcess());
            fail();
        } catch (IOException e) {
        }
    }

    @Test
    public void testHfsBracketAsterisk() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "{*}");
        assertTrue(hfs.resourceExists(getPlatform().getFlowProcess()));
        TupleEntryIterator openForRead = hfs.openForRead(getPlatform().getFlowProcess());
        assertTrue(openForRead.hasNext());
        openForRead.close();
        try {
            new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputPath + "/blah/").openForRead(getPlatform().getFlowProcess());
            fail();
        } catch (IOException e) {
        }
    }

    @Test
    public void testDupeConfigFromScheme() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap tap = getPlatform().getTap(new DupeConfigScheme(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper, SinkMode.KEEP);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", textFile);
        hashMap.put("upper", tap);
        Tap textFile2 = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), getOutputPath("dupeconfig"), SinkMode.REPLACE);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector(FlowConnectorProps.flowConnectorProps().setEnableDecorateAccumulatedTap(false).buildProperties(getProperties())).connect(hashMap, textFile2, new HashJoin(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        List sinkAsList = getSinkAsList(connect);
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        assertTrue(sinkAsList.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
    }

    @Test
    public void testMissingInputFormat() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        try {
            getPlatform().getFlowConnector().connect(new Hfs(new TextDelimited(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.3
                public void sourceConfInit(FlowProcess<? extends Configuration> flowProcess, Configuration configuration) {
                }

                public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Object obj) {
                    sourceConfInit((FlowProcess<? extends Configuration>) flowProcess, (Configuration) obj);
                }
            }, new Hfs(new TextDelimited(Fields.ALL), getOutputPath("missinginputformat"), SinkMode.REPLACE), new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}))).complete();
            fail("did not test for missing input format");
        } catch (Exception e) {
        }
    }

    @Test
    public void testChildIdentifiers() throws Exception {
        if (getPlatform().isUseCluster()) {
            getPlatform().copyFromLocal(InputData.inputFileLower);
            getPlatform().copyFromLocal(InputData.inputFileUpper);
            Configuration configuration = ((BaseHadoopPlatform) getPlatform()).getConfiguration();
            Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), getOutputPath("multifiles"));
            hfs.deleteResource(getPlatform().getFlowProcess());
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration));
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration, 2, true));
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration, 2, false));
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration, 1, true));
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration, 1, false));
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration, 0, true));
            assertEqualsSize("missing", 0, hfs.getChildIdentifiers(configuration, 0, false));
            hfs.createResource(getPlatform().getFlowProcess());
            assertEqualsSize("no children", 0, hfs.getChildIdentifiers(configuration));
            assertEqualsSize("no children", 0, hfs.getChildIdentifiers(configuration, 2, true));
            assertEqualsSize("no children", 0, hfs.getChildIdentifiers(configuration, 2, false));
            assertEqualsSize("no children", 0, hfs.getChildIdentifiers(configuration, 1, true));
            assertEqualsSize("no children", 0, hfs.getChildIdentifiers(configuration, 1, false));
            assertEqualsSize("no children", 1, hfs.getChildIdentifiers(configuration, 0, true));
            assertEqualsSize("no children", 1, hfs.getChildIdentifiers(configuration, 0, false));
            writeFileTo("multifiles/A");
            writeFileTo("multifiles/B");
            assertEqualsSize("children", 2, hfs.getChildIdentifiers(configuration));
            assertEqualsSize("children", 2, hfs.getChildIdentifiers(configuration, 2, true));
            assertEqualsSize("children", 2, hfs.getChildIdentifiers(configuration, 2, false));
            assertEqualsSize("children", 2, hfs.getChildIdentifiers(configuration, 1, true));
            assertEqualsSize("children", 2, hfs.getChildIdentifiers(configuration, 1, false));
            assertEqualsSize("children", 1, hfs.getChildIdentifiers(configuration, 0, true));
            assertEqualsSize("children", 1, hfs.getChildIdentifiers(configuration, 0, false));
            Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "/");
            assertEqualsSize("root", -1, hfs2.getChildIdentifiers(configuration));
            assertEqualsSize("root", -1, hfs2.getChildIdentifiers(configuration, 2, true));
            assertEqualsSize("root", -1, hfs2.getChildIdentifiers(configuration, 2, false));
            assertEqualsSize("root", -1, hfs2.getChildIdentifiers(configuration, 1, true));
            assertEqualsSize("root", -1, hfs2.getChildIdentifiers(configuration, 1, false));
            assertEqualsSize("root", 1, hfs2.getChildIdentifiers(configuration, 0, true));
            assertEqualsSize("root", 1, hfs2.getChildIdentifiers(configuration, 0, false));
            Hfs hfs3 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), "./");
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration));
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration, 2, true));
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration, 2, false));
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration, 1, true));
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration, 1, false));
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration, 0, true));
            assertEqualsSize("current", -1, hfs3.getChildIdentifiers(configuration, 0, false));
            Hfs hfs4 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), getOutputPath("hiddenfiles"));
            writeFileTo("hiddenfiles/A");
            writeFileTo("hiddenfiles/B");
            writeFileTo("hiddenfiles/.hidden");
            assertEqualsSize("children", 2, hfs4.getChildIdentifiers(configuration));
            assertEqualsSize("children", 2, hfs4.getChildIdentifiers(configuration, 2, true));
            assertEqualsSize("children", 2, hfs4.getChildIdentifiers(configuration, 2, false));
            assertEqualsSize("children", 2, hfs4.getChildIdentifiers(configuration, 1, true));
            assertEqualsSize("children", 2, hfs4.getChildIdentifiers(configuration, 1, false));
            assertEqualsSize("children", 1, hfs4.getChildIdentifiers(configuration, 0, true));
            assertEqualsSize("children", 1, hfs4.getChildIdentifiers(configuration, 0, false));
        }
    }

    public void assertEqualsSize(String str, int i, String[] strArr) {
        if (i == -1) {
            return;
        }
        assertEquals(i, strArr.length);
    }

    private void writeFileTo(String str) throws IOException {
        TupleEntryCollector openForWrite = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), getOutputPath(str)).openForWrite(getPlatform().getFlowProcess());
        openForWrite.add(new Tuple(new Object[]{1, "1"}));
        openForWrite.close();
    }

    @Test
    public void testPrepareResource() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        final int[] iArr = {0};
        final int[] iArr2 = {0};
        Flow connect = getPlatform().getFlowConnector().connect(new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.4
            public boolean prepareResourceForRead(Configuration configuration) throws IOException {
                iArr[0] = iArr[0] + 1;
                return true;
            }
        }, new Hfs(new TextDelimited(Fields.ALL), getOutputPath("preparetap"), SinkMode.REPLACE) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.5
            public boolean prepareResourceForWrite(Configuration configuration) throws IOException {
                iArr2[0] = iArr2[0] + 1;
                return true;
            }
        }, new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})));
        connect.complete();
        assertEquals(1, iArr[0]);
        assertEquals(1, iArr2[0]);
        validateLength(connect, 8, null);
    }

    @Test
    public void testPrepareResourceForReadFails() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        try {
            getPlatform().getFlowConnector().connect(new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.6
                public boolean prepareResourceForRead(Configuration configuration) throws IOException {
                    throw new IOException("failed intentionally");
                }
            }, new Hfs(new TextDelimited(Fields.ALL), getOutputPath("preparereadtapfail"), SinkMode.REPLACE), new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}))).complete();
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testPrepareResourceForWriteFails() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        try {
            getPlatform().getFlowConnector().connect(getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache), new Hfs(new TextDelimited(Fields.ALL), getOutputPath("preparewritetapfail"), SinkMode.REPLACE) { // from class: cascading.tap.hadoop.HadoopTapPlatformTest.7
                public boolean prepareResourceForWrite(Configuration configuration) throws IOException {
                    throw new IOException("failed intentionally");
                }
            }, new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"}))).complete();
            fail();
        } catch (Exception e) {
        }
    }

    @Test
    public void testTemporarySinkPathIsDeleted() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLowerOffset);
        Tap delimitedFile = getPlatform().getDelimitedFile(new Fields(new Comparable[]{"a", "b"}), " ", InputData.inputFileLowerOffset);
        Pipe pipe = new Pipe("test");
        String outputPath = getOutputPath("partition-tap-sink");
        Flow connect = getPlatform().getFlowConnector().connect(delimitedFile, getPlatform().getPartitionTap(getPlatform().getDelimitedFile(new Fields(new Comparable[]{"a"}), " ", outputPath), new DelimitedPartition(new Fields(new Comparable[]{"b"})), 1), pipe);
        connect.complete();
        Path path = new Path(outputPath, "_temporary");
        assertFalse(path.getFileSystem((Configuration) connect.getConfigCopy()).exists(path));
    }
}
