package cascading.tuple.hadoop;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.SequenceFile;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.coerce.Coercions;
import cascading.tuple.hadoop.util.BytesComparator;
import data.InputData;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.junit.Test;

/* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest.class */
public class SerializedPipesPlatformTest extends PlatformTestCase {

    /* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest$Container.class */
    public static class Container implements Serializable, Comparable<String> {
        String value;

        public Container(String str) {
            this.value = str;
        }

        @Override // java.lang.Comparable
        public int compareTo(String str) {
            return this.value.compareTo(str);
        }
    }

    /* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest$InsertBoolean.class */
    public static class InsertBoolean extends BaseOperation implements Function {
        boolean asBoolean;

        public InsertBoolean(Fields fields, boolean z) {
            super(fields);
            this.asBoolean = z;
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            functionCall.getOutputCollector().add(new Tuple(new Object[]{new BooleanWritable(this.asBoolean)}));
        }
    }

    /* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest$InsertBytes.class */
    public static class InsertBytes extends BaseOperation implements Function {
        String asBytes;

        public InsertBytes(Fields fields, String str) {
            super(fields);
            this.asBytes = str;
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            functionCall.getOutputCollector().add(new Tuple(new Object[]{new BytesWritable(this.asBytes.getBytes())}));
        }
    }

    /* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest$InsertRawBytes.class */
    public static class InsertRawBytes extends BaseOperation<Long> implements Function<Long> {
        String asBytes;
        private boolean increment;
        private boolean randomIncrement;

        public InsertRawBytes(Fields fields, String str, boolean z, boolean z2) {
            super(fields);
            this.increment = false;
            this.randomIncrement = false;
            this.asBytes = str;
            this.increment = z;
            this.randomIncrement = z2;
        }

        public InsertRawBytes(Fields fields, String str, boolean z) {
            super(fields);
            this.increment = false;
            this.randomIncrement = false;
            this.asBytes = str;
            this.increment = z;
        }

        public void prepare(FlowProcess flowProcess, OperationCall<Long> operationCall) {
            operationCall.setContext(Long.valueOf(this.increment ? getIncrement(0L) : -1L));
        }

        private long getIncrement(long j) {
            return this.randomIncrement ? j + ((long) (Math.random() * new Object().hashCode())) : j + 1;
        }

        public void operate(FlowProcess flowProcess, FunctionCall<Long> functionCall) {
            String str = this.asBytes;
            if (((Long) functionCall.getContext()).longValue() != -1) {
                str = functionCall.getContext() + str;
                functionCall.setContext(Long.valueOf(((Long) functionCall.getContext()).longValue() + 1));
            }
            functionCall.getOutputCollector().add(new Tuple(new Object[]{str.getBytes()}));
        }
    }

    /* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest$InsertTestText.class */
    public static class InsertTestText extends BaseOperation<Long> implements Function<Long> {
        private String testText;
        private boolean increment;
        private int moduloValueIsNull;
        private int moduloResultIsNull;

        public InsertTestText(Fields fields, String str, boolean z) {
            this(fields, str, z, -1, -1);
        }

        public InsertTestText(Fields fields, String str, boolean z, int i, int i2) {
            super(fields);
            this.testText = str;
            this.increment = z;
            this.moduloValueIsNull = i;
            this.moduloResultIsNull = i2;
        }

        public void prepare(FlowProcess flowProcess, OperationCall<Long> operationCall) {
            operationCall.setContext(Long.valueOf(this.increment ? 0L : -1L));
        }

        public void operate(FlowProcess flowProcess, FunctionCall<Long> functionCall) {
            String str = this.testText;
            if (((Long) functionCall.getContext()).longValue() != -1) {
                str = functionCall.getContext() + str;
                functionCall.setContext(Long.valueOf(((Long) functionCall.getContext()).longValue() + 1));
                if (this.moduloValueIsNull != -1 && ((Long) functionCall.getContext()).longValue() % this.moduloValueIsNull == 0) {
                    str = null;
                }
            }
            TestText testText = null;
            if (this.moduloResultIsNull != -1 && ((Long) functionCall.getContext()).longValue() % this.moduloResultIsNull != 0) {
                testText = new TestText(str);
            }
            functionCall.getOutputCollector().add(new Tuple(new Object[]{testText}));
        }
    }

    /* loaded from: input_file:cascading/tuple/hadoop/SerializedPipesPlatformTest$ReplaceAsBytes.class */
    public static class ReplaceAsBytes extends BaseOperation implements Function {
        public ReplaceAsBytes(Fields fields) {
            super(fields);
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            functionCall.getOutputCollector().add(new Tuple(new Object[]{new BytesWritable(functionCall.getArguments().getString(0).getBytes())}));
        }
    }

    public SerializedPipesPlatformTest() {
        super(true, 4, 2);
    }

    @Test
    public void testSimpleGroup() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache);
        Each each = new Each(new Every(new GroupBy(new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new InsertBytes(new Fields(new Comparable[]{"bytes"}), "inserted text as bytes"), Fields.ALL), new Fields(new Comparable[]{"ip"})), new Count(), new Fields(new Comparable[]{"ip", "count"})), new InsertBoolean(new Fields(new Comparable[]{"boolean"}), false), Fields.ALL);
        Hfs hfs2 = new Hfs(new SequenceFile(Fields.ALL), getOutputPath("serialization"), SinkMode.REPLACE);
        Map properties = getProperties();
        TupleSerializationProps.addSerializationToken(properties, 1000, BooleanWritable.class.getName());
        Flow connect = getPlatform().getFlowConnector(properties).connect(hfs, hfs2, each);
        connect.complete();
        validateLength(connect.openSource(), 10);
        validateLength(connect, 8, null);
    }

    @Test
    public void testSimpleGroupOnBytes() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache);
        Each each = new Each(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new InsertRawBytes(new Fields(new Comparable[]{"bytes"}), "inserted text as bytes", true, true), Fields.ALL);
        Fields fields = new Fields(new Comparable[]{"bytes"});
        fields.setComparator("bytes", new BytesComparator());
        Every every = new Every(new GroupBy(each, fields), new Count(), new Fields(new Comparable[]{"bytes", "count"}));
        Hfs hfs2 = new Hfs(new SequenceFile(Fields.ALL), getOutputPath("grouponbytes"), SinkMode.REPLACE);
        Map properties = getProperties();
        TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName());
        Flow connect = getPlatform().getFlowConnector(properties).connect(hfs, hfs2, every);
        connect.complete();
        validateLength(connect, 10);
    }

    @Test
    public void testCoGroupWritableAsKeyValue() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, new Hfs(new SequenceFile(Fields.ALL), getOutputPath("writablekeyvalue"), SinkMode.REPLACE), new CoGroup(new Each(new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new InsertBytes(new Fields(new Comparable[]{"group"}), "inserted text as bytes"), Fields.ALL), new InsertBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes"), Fields.ALL), new Fields(new Comparable[]{"group"}), new Each(new Each(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new InsertBytes(new Fields(new Comparable[]{"group"}), "inserted text as bytes"), Fields.ALL), new InsertBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes"), Fields.ALL), new Fields(new Comparable[]{"group"}), Fields.size(8)));
        connect.complete();
        validateLength(connect, 25);
    }

    @Test
    public void testCoGroupBytesWritableAsKeyValue() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Flow connect = getPlatform().getFlowConnector(getProperties()).connect(hashMap, new Hfs(new TextLine(new Fields(new Comparable[]{"line"})), getOutputPath("byteswritablekeyvalue"), SinkMode.REPLACE), new CoGroup(new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"char"}), new ReplaceAsBytes(new Fields(new Comparable[]{"char"})), Fields.REPLACE), new Fields(new Comparable[]{"num"}), new Each(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new Fields(new Comparable[]{"char"}), new ReplaceAsBytes(new Fields(new Comparable[]{"char"})), Fields.REPLACE), new Fields(new Comparable[]{"num"}), Fields.size(4)));
        connect.complete();
        validateLength(connect, 5);
        assertTrue(getSinkAsList(connect).contains(new Tuple(new Object[]{"1\t61\t1\t41"})));
    }

    @Test
    public void testCoGroupSpillCustomWritable() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs hfs3 = new Hfs(new SequenceFile(Fields.ALL), getOutputPath("customerwritable"), SinkMode.REPLACE);
        CoGroup coGroup = new CoGroup(new Each(new Each(new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", false), Fields.ALL), new InsertTestText(new Fields(new Comparable[]{"value"}), "inserted text as bytes", false), Fields.ALL), new InsertTestText(new Fields(new Comparable[]{"text"}), "inserted text as custom text", false), Fields.ALL), new Fields(new Comparable[]{"group"}), new Each(new Each(new Each(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", false), Fields.ALL), new InsertTestText(new Fields(new Comparable[]{"value"}), "inserted text as bytes", false), Fields.ALL), new InsertTestText(new Fields(new Comparable[]{"text"}), "inserted text as custom text", false), Fields.ALL), new Fields(new Comparable[]{"group"}), Fields.size(10));
        Map properties = getProperties();
        properties.put("cascading.spill.list.threshold", 1);
        properties.put("io.serializations", TestSerialization.class.getName());
        Flow connect = getPlatform().getFlowConnector(properties).connect(hashMap, hfs3, coGroup);
        connect.complete();
        validateLength(connect, 25);
    }

    @Test
    public void testCoGroupRawAsKeyValue() throws Exception {
        invokeRawAsKeyValue(false, true, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefault() throws Exception {
        invokeRawAsKeyValue(true, true, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultIgnoreToken() throws Exception {
        invokeRawAsKeyValue(true, true, true, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultIgnoreTokenCompositeGrouping() throws Exception {
        invokeRawAsKeyValue(true, true, true, true);
    }

    @Test
    public void testCoGroupRawAsKeyValueNoSecondary() throws Exception {
        invokeRawAsKeyValue(false, false, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultNoSecondary() throws Exception {
        invokeRawAsKeyValue(true, false, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultNoSecondaryCompositeGrouping() throws Exception {
        invokeRawAsKeyValue(true, false, false, true);
    }

    private void invokeRawAsKeyValue(boolean z, boolean z2, boolean z3, boolean z4) throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileLower);
        getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs hfs2 = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap hashMap = new HashMap();
        hashMap.put("lower", hfs);
        hashMap.put("upper", hfs2);
        RegexSplitter regexSplitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs hfs3 = new Hfs(new SequenceFile(new Fields(new Comparable[]{"num", "char", "group", "value", "num2", "char2", "group2", "value2"})), getOutputPath("/rawbyteskeyvalue/" + z + "/" + z2 + "/" + z3 + "/" + z4), SinkMode.REPLACE);
        Each each = new Each(new Each(new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), regexSplitter), new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", true, 3, 4), Fields.ALL), new InsertRawBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes", true), Fields.ALL);
        Each each2 = new Each(new Each(new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), regexSplitter), new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", true, 3, 4), Fields.ALL), new InsertRawBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes", true), Fields.ALL);
        Fields fields = new Fields(new Comparable[]{"group"});
        if (z4) {
            fields = new Fields(new Comparable[]{"group", "num"});
        }
        if (!z) {
            fields.setComparator("group", new TestTextComparator());
        }
        CoGroup coGroup = new CoGroup(each, fields, each2, fields, new Fields(new Comparable[]{"num", "char", "group", "value", "num2", "char2", "group2", "value2"}));
        Fields fields2 = new Fields(new Comparable[]{"value"});
        if (!z) {
            fields2.setComparator("value", new BytesComparator());
        }
        GroupBy groupBy = z2 ? new GroupBy(coGroup, fields, fields2) : new GroupBy(coGroup, fields);
        Map properties = getProperties();
        if (z3) {
            TupleSerializationProps.addSerialization(properties, NoTokenTestSerialization.class.getName());
            TupleSerializationProps.addSerialization(properties, NoTokenTestBytesSerialization.class.getName());
        } else {
            TupleSerializationProps.addSerialization(properties, TestSerialization.class.getName());
            TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName());
        }
        getPlatform().setNumMapTasks(properties, 1);
        getPlatform().setNumReduceTasks(properties, 1);
        getPlatform().setNumGatherPartitionTasks(properties, 1);
        Flow connect = getPlatform().getFlowConnector(properties).connect(hashMap, hfs3, groupBy);
        connect.complete();
        validateLength(connect, 5);
        TupleEntryIterator openSink = connect.openSink();
        TestText testText = (TestText) ((TupleEntry) openSink.next()).getObject("group");
        String str = testText == null ? null : testText.value;
        while (true) {
            String str2 = str;
            if (!openSink.hasNext()) {
                openSink.close();
                return;
            }
            TestText testText2 = (TestText) ((TupleEntry) openSink.next()).getObject("group");
            String str3 = testText2 == null ? null : testText2.value;
            if (str2 != null && str2.compareTo(str3) >= 0) {
                fail("not increasing: " + str2 + " " + str2);
            }
            str = str3;
        }
    }

    @Test
    public void testBigDecimal() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs hfs = new Hfs(new TextLine(new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Coercions.BIG_DECIMAL, String.class})), InputData.inputFileApache);
        Every every = new Every(new GroupBy(new Each(new Pipe("test"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"offset", "ip"})), new Fields(new Comparable[]{"offset"})), new Count(), new Fields(new Comparable[]{"offset", "count"}));
        Hfs hfs2 = new Hfs(new SequenceFile(new Fields(new Comparable[]{"offset", "count"}).applyTypes(new Type[]{Coercions.BIG_DECIMAL, Long.TYPE})), getOutputPath("bigdecimal"), SinkMode.REPLACE);
        Map properties = getProperties();
        TupleSerializationProps.addSerialization(properties, BigDecimalSerialization.class.getName());
        Flow connect = getPlatform().getFlowConnector(properties).connect(hfs, hfs2, every);
        connect.complete();
        validateLength(connect, 10);
    }
}
