package cascading.flow.local;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryStream;
import data.InputData;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

/* loaded from: input_file:cascading/flow/local/FlowPlatformTest.class */
public class FlowPlatformTest extends PlatformTestCase {
    @Test
    public void testStop() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache200);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache200);
        Pipe pipe = new Pipe("test");
        final int i = 10;
        final Semaphore semaphore = new Semaphore(0);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Each each = new Each(pipe, new Fields(new Comparable[]{"line"}), new Identity(Fields.ARGS) { // from class: cascading.flow.local.FlowPlatformTest.1
            public void operate(FlowProcess flowProcess, FunctionCall<Identity.Functor> functionCall) {
                if (atomicInteger.getAndAdd(1) >= i) {
                    semaphore.release();
                } else {
                    super.operate(flowProcess, functionCall);
                }
            }
        });
        Tap textFile2 = getPlatform().getTextFile(getOutputPath("simple"), SinkMode.REPLACE);
        Flow connect = getPlatform().getFlowConnector().connect(textFile, textFile2, each);
        connect.start();
        semaphore.acquire();
        connect.stop();
        long count = TupleEntryStream.entryStream(textFile, connect.getFlowProcess()).count();
        long count2 = TupleEntryStream.entryStream(textFile2, connect.getFlowProcess()).count();
        assertNotSame(Long.valueOf(count), Long.valueOf(count2));
        assertEquals(10, count2);
    }
}
