package com.autumn.utils.messageQueueUtils;

import com.autumn.reporting.extentReport.Logger;
import com.autumn.utils.exceptions.MessageNotFoundInKafkaConsumerException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:com/autumn/utils/messageQueueUtils/KafkaListener.class */
public class KafkaListener {
    private static volatile KafkaListener instance;

    private KafkaListener() {
    }

    public static KafkaListener getInstance() {
        if (instance == null) {
            synchronized (KafkaListener.class) {
                if (instance == null) {
                    instance = new KafkaListener();
                }
            }
        }
        return instance;
    }

    public Consumer<String, byte[]> createConsumer(String str, String str2, String str3) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str3);
        properties.put("group.id", str);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("auto.offset.reset", "earliest");
        properties.setProperty("specific.avro.reader", "true");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(str2));
        Logger.logInfoInLogger("Consumer connection done");
        return kafkaConsumer;
    }

    public synchronized ArrayList<String> runConsumer(String str, String str2, String str3, Schema schema) throws IOException {
        captureKafkaDetails(str2, str, str3);
        Logger.logInfoInLogger("Running Consumer --- " + str + " on Topic " + str2);
        ArrayList<String> arrayList = new ArrayList<>();
        Consumer<String, byte[]> createConsumer = createConsumer(str, str2, str3);
        int i = 0;
        while (true) {
            ConsumerRecords poll = createConsumer.poll(20L);
            if (poll.count() == 0) {
                i++;
                if (i > 100) {
                    createConsumer.close();
                    capturekafkaConsumerMessages(arrayList);
                    Logger.logInfoInLogger("DONE");
                    return arrayList;
                }
            } else {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    consumerRecord.value();
                    try {
                        ByteBuffer wrap = ByteBuffer.wrap((byte[]) consumerRecord.value());
                        arrayList.add(((GenericRecord) new GenericDatumReader(schema).read((Object) null, DecoderFactory.get().binaryDecoder((byte[]) consumerRecord.value(), wrap.position(), wrap.remaining(), (BinaryDecoder) null))).toString());
                    } catch (Exception e) {
                        arrayList.add(new String((byte[]) consumerRecord.value()));
                    }
                }
                createConsumer.commitAsync();
            }
        }
    }

    public void captureKafkaDetails(String str, String str2, String str3) {
        Logger.logInfo("Kaftka details are :<br> Bootstrap Server : " + str3 + "<br>Kaftka Topic : " + str + "<br>KaftkaConsumer : " + str2);
    }

    public void capturekafkaConsumerMessages(ArrayList<String> arrayList) {
        if (arrayList.size() != 0) {
            String str = "";
            for (int i = 0; i < arrayList.size(); i++) {
                str = (i + 1) + ") " + str + arrayList.get(i) + "\n";
            }
            Logger.logInfo("All Messages recieved in Kaftka consumer :<br> " + str);
        }
    }

    public void logkafkaConsumerMessages(ArrayList<String> arrayList) {
        if (arrayList.size() != 0) {
            String str = "";
            for (int i = 0; i < arrayList.size(); i++) {
                str = (i + 1) + ") " + str + arrayList.get(i) + "\n";
            }
            Logger.logInfoInLogger("All Messages recieved in Kaftka consumer are :<br> " + str);
        }
    }

    public void captureSpecifickafkaConsumerMessage(ArrayList<String> arrayList, String str) {
        Logger.logInfo("Message stored in Kaftka consumer :<br> " + arrayList.get(getInstance().getIndexofKafkaConsumerMessage(arrayList, str)));
    }

    public int getIndexofKafkaConsumerMessage(ArrayList<String> arrayList, String str) {
        for (int i = 0; i < arrayList.size(); i++) {
            if (arrayList.get(i).contains(str)) {
                return i;
            }
            if (i == arrayList.size() - 1) {
                throw new MessageNotFoundInKafkaConsumerException((Object) " Message not found in Kaftka Consumer ");
            }
        }
        throw new MessageNotFoundInKafkaConsumerException((Object) " Message not found in Kaftka Consumer ");
    }
}
