package org.apache.kafka.streams.perf;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

/* loaded from: input_file:org/apache/kafka/streams/perf/SimpleBenchmark.class */
public class SimpleBenchmark {
    private final String kafka;
    private final String zookeeper;
    private final File stateDir;
    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
    private static final long NUM_RECORDS = 10000000;
    private static final int KEY_SIZE = 8;
    private static final int VALUE_SIZE = 100;
    private static final int RECORD_SIZE = 108;
    private static final Long END_KEY = 9999999L;
    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
    private static final Serde<Long> LONG_SERDE = Serdes.Long();

    public SimpleBenchmark(File file, String str, String str2) {
        this.stateDir = file;
        this.kafka = str;
        this.zookeeper = str2;
    }

    public static void main(String[] strArr) throws Exception {
        File file = new File("/tmp/kafka-streams-simple-benchmark");
        file.mkdir();
        new File(file, "rocksdb-test").mkdir();
        SimpleBenchmark simpleBenchmark = new SimpleBenchmark(file, "localhost:9092", "localhost:2181");
        simpleBenchmark.produce();
        simpleBenchmark.consume();
        simpleBenchmark.processStream();
        simpleBenchmark.processStreamWithSink();
        simpleBenchmark.processStreamWithStateStore();
    }

    public void processStream() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final KafkaStreams createKafkaStreams = createKafkaStreams(this.stateDir, this.kafka, this.zookeeper, countDownLatch);
        Thread thread = new Thread() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                createKafkaStreams.start();
            }
        };
        thread.start();
        long currentTimeMillis = System.currentTimeMillis();
        while (countDownLatch.getCount() > 0) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(System.currentTimeMillis() - currentTimeMillis));
        createKafkaStreams.close();
        try {
            thread.join();
        } catch (Exception e2) {
        }
    }

    public void processStreamWithSink() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final KafkaStreams createKafkaStreamsWithSink = createKafkaStreamsWithSink(this.stateDir, this.kafka, this.zookeeper, countDownLatch);
        Thread thread = new Thread() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                createKafkaStreamsWithSink.start();
            }
        };
        thread.start();
        long currentTimeMillis = System.currentTimeMillis();
        while (countDownLatch.getCount() > 0) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(System.currentTimeMillis() - currentTimeMillis));
        createKafkaStreamsWithSink.close();
        try {
            thread.join();
        } catch (Exception e2) {
        }
    }

    public void processStreamWithStateStore() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final KafkaStreams createKafkaStreamsWithStateStore = createKafkaStreamsWithStateStore(this.stateDir, this.kafka, this.zookeeper, countDownLatch);
        Thread thread = new Thread() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                createKafkaStreamsWithStateStore.start();
            }
        };
        thread.start();
        long currentTimeMillis = System.currentTimeMillis();
        while (countDownLatch.getCount() > 0) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        System.out.println("Streams Performance [MB/sec read+store]: " + megaBytePerSec(System.currentTimeMillis() - currentTimeMillis));
        createKafkaStreamsWithStateStore.close();
        try {
            thread.join();
        } catch (Exception e2) {
        }
    }

    public void produce() {
        Properties properties = new Properties();
        properties.put("client.id", "simple-benchmark-produce");
        properties.put("bootstrap.servers", this.kafka);
        properties.put("key.serializer", LongSerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        byte[] bArr = new byte[100];
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < NUM_RECORDS; i++) {
            kafkaProducer.send(new ProducerRecord(SOURCE_TOPIC, Long.valueOf(i), bArr));
        }
        kafkaProducer.close();
        System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(System.currentTimeMillis() - currentTimeMillis));
    }

    public void consume() {
        Properties properties = new Properties();
        properties.put("client.id", "simple-benchmark-consumer");
        properties.put("bootstrap.servers", this.kafka);
        properties.put("key.deserializer", LongDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(properties);
        List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, SOURCE_TOPIC);
        kafkaConsumer.assign(allPartitions);
        kafkaConsumer.seekToBeginning(allPartitions);
        Long l = null;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            ConsumerRecords poll = kafkaConsumer.poll(500L);
            if (!poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    l = (Long) ((ConsumerRecord) it.next()).key();
                }
            } else if (END_KEY.equals(l)) {
                long currentTimeMillis2 = System.currentTimeMillis();
                kafkaConsumer.close();
                System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(currentTimeMillis2 - currentTimeMillis));
                return;
            }
        }
    }

    private KafkaStreams createKafkaStreams(File file, String str, String str2, final CountDownLatch countDownLatch) {
        Properties properties = new Properties();
        properties.put("application.id", "simple-benchmark-streams");
        properties.put("state.dir", file.toString());
        properties.put("bootstrap.servers", str);
        properties.put("zookeeper.connect", str2);
        properties.put("num.stream.threads", 1);
        properties.put("auto.offset.reset", "earliest");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(LONG_SERDE, BYTE_SERDE, new String[]{SOURCE_TOPIC}).process(new ProcessorSupplier<Long, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.4
            public Processor<Long, byte[]> get() {
                return new Processor<Long, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.4.1
                    public void init(ProcessorContext processorContext) {
                    }

                    public void process(Long l, byte[] bArr) {
                        if (SimpleBenchmark.END_KEY.equals(l)) {
                            countDownLatch.countDown();
                        }
                    }

                    public void punctuate(long j) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return new KafkaStreams(kStreamBuilder, properties);
    }

    private KafkaStreams createKafkaStreamsWithSink(File file, String str, String str2, final CountDownLatch countDownLatch) {
        Properties properties = new Properties();
        properties.put("application.id", "simple-benchmark-streams-with-sink");
        properties.put("state.dir", file.toString());
        properties.put("bootstrap.servers", str);
        properties.put("zookeeper.connect", str2);
        properties.put("num.stream.threads", 1);
        properties.put("auto.offset.reset", "earliest");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(LONG_SERDE, BYTE_SERDE, new String[]{SOURCE_TOPIC});
        stream.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC);
        stream.process(new ProcessorSupplier<Long, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.5
            public Processor<Long, byte[]> get() {
                return new Processor<Long, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.5.1
                    public void init(ProcessorContext processorContext) {
                    }

                    public void process(Long l, byte[] bArr) {
                        if (SimpleBenchmark.END_KEY.equals(l)) {
                            countDownLatch.countDown();
                        }
                    }

                    public void punctuate(long j) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[0]);
        return new KafkaStreams(kStreamBuilder, properties);
    }

    private KafkaStreams createKafkaStreamsWithStateStore(File file, String str, String str2, final CountDownLatch countDownLatch) {
        Properties properties = new Properties();
        properties.put("application.id", "simple-benchmark-streams-with-store");
        properties.put("state.dir", file.toString());
        properties.put("bootstrap.servers", str);
        properties.put("zookeeper.connect", str2);
        properties.put("num.stream.threads", 1);
        properties.put("auto.offset.reset", "earliest");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build(), new String[0]);
        kStreamBuilder.stream(LONG_SERDE, BYTE_SERDE, new String[]{SOURCE_TOPIC}).process(new ProcessorSupplier<Long, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.6
            public Processor<Long, byte[]> get() {
                return new Processor<Long, byte[]>() { // from class: org.apache.kafka.streams.perf.SimpleBenchmark.6.1
                    KeyValueStore<Long, byte[]> store;

                    public void init(ProcessorContext processorContext) {
                        this.store = processorContext.getStateStore("store");
                    }

                    public void process(Long l, byte[] bArr) {
                        this.store.put(l, bArr);
                        if (SimpleBenchmark.END_KEY.equals(l)) {
                            countDownLatch.countDown();
                        }
                    }

                    public void punctuate(long j) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"store"});
        return new KafkaStreams(kStreamBuilder, properties);
    }

    private double megaBytePerSec(long j) {
        return 1029.0d / (j / 1000.0d);
    }

    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> kafkaConsumer, String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(str)) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        return arrayList;
    }
}
