package kafka.examples;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/* loaded from: input_file:kafka/examples/KafkaExactlyOnceDemo.class */
public class KafkaExactlyOnceDemo {
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
    public static final String GROUP_NAME = "check-group";

    public static void main(String[] strArr) {
        try {
            if (strArr.length != 3) {
                Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n- partition: number of partitions for input and output topics (required)%n- instances: number of application instances (required)%n- records: total number of records (required)", new Object[0]);
                return;
            }
            int parseInt = Integer.parseInt(strArr[0]);
            int parseInt2 = Integer.parseInt(strArr[1]);
            int parseInt3 = Integer.parseInt(strArr[2]);
            Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, parseInt, INPUT_TOPIC, OUTPUT_TOPIC);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Producer producer = new Producer("producer", KafkaProperties.BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, parseInt3, -1, countDownLatch);
            producer.start();
            if (!countDownLatch.await(2L, TimeUnit.MINUTES)) {
                Utils.printErr("Timeout after 2 minutes waiting for data load", new Object[0]);
                producer.shutdown();
                return;
            }
            CountDownLatch countDownLatch2 = new CountDownLatch(parseInt2);
            List list = (List) IntStream.range(0, parseInt2).mapToObj(i -> {
                return new ExactlyOnceMessageProcessor("processor-" + i, KafkaProperties.BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, countDownLatch2);
            }).collect(Collectors.toList());
            list.forEach((v0) -> {
                v0.start();
            });
            if (!countDownLatch2.await(2L, TimeUnit.MINUTES)) {
                Utils.printErr("Timeout after 2 minutes waiting for record copy", new Object[0]);
                list.forEach((v0) -> {
                    v0.shutdown();
                });
                return;
            }
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            Consumer consumer = new Consumer("consumer", KafkaProperties.BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, parseInt3, countDownLatch3);
            consumer.start();
            if (!countDownLatch3.await(2L, TimeUnit.MINUTES)) {
                Utils.printErr("Timeout after 2 minutes waiting for output read", new Object[0]);
                consumer.shutdown();
            }
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }
}
