package cascading.local.tap.kafka;

import cascading.CascadingTestCase;
import cascading.flow.FlowProcess;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import java.io.IOException;
import java.util.function.Predicate;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:cascading/local/tap/kafka/KafkaTapIntegrationTest.class */
public class KafkaTapIntegrationTest extends CascadingTestCase {

    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer("5.1.0");

    @Test
    public void writeRead() throws Exception {
        handle(new TextKafkaScheme(), tupleEntry -> {
            return tupleEntry.getObject(3) instanceof String;
        }, "my-test-default");
    }

    @Test
    public void writeReadTyped() throws Exception {
        handle(new TextKafkaScheme(new Fields("topic", String.class).append(new Fields("partition", Integer.class)).append(new Fields("offset", Long.class)).append(new Fields("key", Integer.class).append(new Fields("value", Integer.class)).append(new Fields("timestamp", Long.class)).append(new Fields("tsType", String.class)))), tupleEntry -> {
            return tupleEntry.getObject(3) instanceof Integer;
        }, "my-test-typed");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handle(TextKafkaScheme textKafkaScheme, Predicate<TupleEntry> predicate, String str) throws IOException {
        String bootstrapServers = kafka.getBootstrapServers();
        KafkaTap kafkaTap = new KafkaTap(textKafkaScheme, bootstrapServers, "test-client", new String[]{str + "-topic"});
        TupleEntryCollector openForWrite = kafkaTap.openForWrite(FlowProcess.nullFlowProcess());
        Throwable th = null;
        for (int i = 0; i < 100; i++) {
            try {
                try {
                    openForWrite.add(new Tuple(new Object[]{Integer.valueOf(i), Integer.valueOf(i)}));
                } finally {
                }
            } catch (Throwable th2) {
                if (openForWrite != null) {
                    if (th != null) {
                        try {
                            openForWrite.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        openForWrite.close();
                    }
                }
                throw th2;
            }
        }
        if (openForWrite != null) {
            if (0 != 0) {
                try {
                    openForWrite.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                openForWrite.close();
            }
        }
        int i2 = 0;
        TupleEntryIterator openForRead = kafkaTap.openForRead(FlowProcess.nullFlowProcess());
        Throwable th5 = null;
        while (openForRead.hasNext() && predicate.test(openForRead.next())) {
            try {
                try {
                    i2++;
                } finally {
                }
            } finally {
            }
        }
        if (openForRead != null) {
            if (0 != 0) {
                try {
                    openForRead.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            } else {
                openForRead.close();
            }
        }
        assertEquals(100, i2);
        int i3 = 0;
        openForRead = kafkaTap.openForRead(FlowProcess.nullFlowProcess());
        Throwable th7 = null;
        while (openForRead.hasNext() && openForRead.next() != null) {
            try {
                try {
                    i3++;
                } finally {
                }
            } finally {
            }
        }
        if (openForRead != null) {
            if (0 != 0) {
                try {
                    openForRead.close();
                } catch (Throwable th8) {
                    th7.addSuppressed(th8);
                }
            } else {
                openForRead.close();
            }
        }
        assertEquals(0, i3);
        int i4 = 0;
        openForRead = new KafkaTap(textKafkaScheme, bootstrapServers, "test-client-2", new String[]{"/" + str + "-.*/"}).openForRead(FlowProcess.nullFlowProcess());
        Throwable th9 = null;
        while (openForRead.hasNext() && openForRead.next() != null) {
            try {
                try {
                    i4++;
                } finally {
                }
            } finally {
                if (openForRead != null) {
                    if (th9 != null) {
                        try {
                            openForRead.close();
                        } catch (Throwable th10) {
                            th9.addSuppressed(th10);
                        }
                    } else {
                        openForRead.close();
                    }
                }
            }
        }
        if (openForRead != null) {
            if (0 != 0) {
                try {
                    openForRead.close();
                } catch (Throwable th11) {
                    th9.addSuppressed(th11);
                }
            } else {
                openForRead.close();
            }
        }
        assertEquals(100, i4);
    }
}
