package dev.lydtech.component.framework.client.kafka;

import dev.lydtech.component.framework.configuration.TestcontainersConfiguration;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/lydtech/component/framework/client/kafka/KafkaClient.class */
public final class KafkaClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaClient.class);
    protected String brokerUrl = "http://" + ((String) Optional.ofNullable(System.getProperty("docker.host")).orElse("localhost")) + ":" + ((String) Optional.ofNullable(System.getProperty("kafka.mapped.port")).orElseThrow(() -> {
        return new RuntimeException("kafka.mapped.port property not found");
    }));
    private static KafkaClient instance;
    private KafkaProducer defaultProducer;

    private KafkaClient() {
        log.info("Kafka broker URL is: " + this.brokerUrl);
        this.defaultProducer = createProducer();
    }

    public static synchronized KafkaClient getInstance() {
        if (instance == null) {
            instance = new KafkaClient();
        }
        return instance;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getBrokerUrl() {
        return this.brokerUrl;
    }

    public Consumer createConsumer(String str, String str2) {
        return createConsumer(str, str2, null);
    }

    public Consumer createConsumer(String str, String str2, Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", this.brokerUrl);
        properties2.put("group.id", str + "-" + str2);
        properties2.put("key.deserializer", StringDeserializer.class);
        properties2.put("value.deserializer", StringDeserializer.class);
        properties2.put("auto.offset.reset", "latest");
        properties2.put("metadata.max.age.ms", 1000);
        if (TestcontainersConfiguration.KAFKA_SASL_PLAIN_ENABLED) {
            properties2.put("security.protocol", "SASL_PLAINTEXT");
            properties2.put("sasl.mechanism", "PLAIN");
            properties2.put("sasl.jaas.config", String.format("%s required username=\"%s\" password=\"%s\";", PlainLoginModule.class.getName(), TestcontainersConfiguration.KAFKA_SASL_PLAIN_USERNAME, TestcontainersConfiguration.KAFKA_SASL_PLAIN_PASSWORD));
        }
        if (properties != null && !properties.isEmpty()) {
            properties2.putAll(properties);
        }
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        kafkaConsumer.subscribe(Collections.singletonList(str2));
        return kafkaConsumer;
    }

    public Consumer initConsumer(String str, String str2, Long l) {
        return initConsumer(str, str2, null, l);
    }

    public Consumer initConsumer(String str, String str2, Properties properties, Long l) {
        Consumer createConsumer = createConsumer(str, str2, properties);
        createConsumer.poll(Duration.ofSeconds(l.longValue()));
        return createConsumer;
    }

    public KafkaProducer<String, String> createProducer() {
        return createProducer(null);
    }

    public KafkaProducer<String, String> createProducer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", this.brokerUrl);
        properties2.put("key.serializer", StringSerializer.class);
        properties2.put("value.serializer", StringSerializer.class);
        if (TestcontainersConfiguration.KAFKA_SASL_PLAIN_ENABLED) {
            properties2.put("security.protocol", "SASL_PLAINTEXT");
            properties2.put("sasl.mechanism", "PLAIN");
            properties2.put("sasl.jaas.config", String.format("%s required username=\"%s\" password=\"%s\";", PlainLoginModule.class.getName(), TestcontainersConfiguration.KAFKA_SASL_PLAIN_USERNAME, TestcontainersConfiguration.KAFKA_SASL_PLAIN_PASSWORD));
        }
        if (properties != null && !properties.isEmpty()) {
            properties2.putAll(properties);
        }
        return new KafkaProducer<>(properties2);
    }

    public RecordMetadata sendMessage(String str, String str2, Object obj) throws Exception {
        return sendMessage(this.defaultProducer, str, str2, obj, null);
    }

    public RecordMetadata sendMessage(Producer producer, String str, String str2, Object obj) throws Exception {
        return sendMessage(producer, str, str2, obj, null);
    }

    public RecordMetadata sendMessage(String str, String str2, Object obj, Map<String, String> map) throws Exception {
        return sendMessage(this.defaultProducer, str, str2, obj, map);
    }

    public RecordMetadata sendMessage(Producer producer, String str, String str2, Object obj, Map<String, String> map) throws Exception {
        ArrayList arrayList = new ArrayList();
        if (map != null && map.size() > 0) {
            map.forEach((str3, str4) -> {
                arrayList.add(new RecordHeader(str3, str4 != null ? str4.getBytes() : null));
            });
        }
        ProducerRecord producerRecord = new ProducerRecord(str, (Integer) null, str2, obj, arrayList);
        RecordMetadata recordMetadata = (RecordMetadata) producer.send(producerRecord).get();
        log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d)", producerRecord.key(), producerRecord.value(), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())));
        return recordMetadata;
    }

    public Future<RecordMetadata> sendMessageAsync(String str, String str2, Object obj) {
        return sendMessageAsync(this.defaultProducer, str, str2, obj, null);
    }

    public Future<RecordMetadata> sendMessageAsync(Producer producer, String str, String str2, Object obj) {
        return sendMessageAsync(producer, str, str2, obj, null);
    }

    public Future<RecordMetadata> sendMessageAsync(String str, String str2, Object obj, Map<String, String> map) {
        return sendMessageAsync(this.defaultProducer, str, str2, obj, map);
    }

    public Future<RecordMetadata> sendMessageAsync(Producer producer, String str, String str2, Object obj, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        if (map != null && map.size() > 0) {
            map.forEach((str3, str4) -> {
                arrayList.add(new RecordHeader(str3, str4 != null ? str4.getBytes() : null));
            });
        }
        return producer.send(new ProducerRecord(str, (Integer) null, str2, obj, arrayList));
    }

    public <T> List<ConsumerRecord<String, T>> consumeAndAssert(String str, Consumer consumer, int i, int i2) throws Exception {
        return consumeAndAssert(str, consumer, i, i2, 60);
    }

    public <T> List<ConsumerRecord<String, T>> consumeAndAssert(String str, Consumer consumer, int i, int i2, int i3) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger(-1);
        AtomicInteger atomicInteger3 = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(i3, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            consumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                log.info(str + " - received: " + String.valueOf(consumerRecord.value()));
                atomicInteger.incrementAndGet();
                arrayList.add(consumerRecord);
            });
            if (atomicInteger.get() == i) {
                atomicInteger2.incrementAndGet();
            }
            atomicInteger3.getAndIncrement();
            log.info(str + " - poll count: " + atomicInteger3.get() + " - received count: " + atomicInteger.get());
            return Boolean.valueOf(atomicInteger.get() == i && atomicInteger2.get() == i2);
        });
        return arrayList;
    }
}
