package arp.message.kafka;

import arp.process.publish.Message;
import arp.process.publish.ProcessMessageReceiver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.LockSupport;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:arp/message/kafka/KafkaMessageReceiver.class */
public class KafkaMessageReceiver implements ProcessMessageReceiver {
    private String name;
    private String servers;
    private List<KafkaConsumer<String, Object>> consumers;
    private int lastReceiveMessageCount;
    private AdminClient adminClient;
    private KafkaMessageDeserializationStrategy<Object> deserializationStrategy;

    public KafkaMessageReceiver(String str, String str2) {
        this(str, str2, new FSTDeserializationStrategy());
    }

    public KafkaMessageReceiver(String str, String str2, KafkaMessageDeserializationStrategy<?> kafkaMessageDeserializationStrategy) {
        this.consumers = new ArrayList();
        this.name = str2;
        this.servers = str;
        this.deserializationStrategy = kafkaMessageDeserializationStrategy;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        this.adminClient = KafkaAdminClient.create(properties);
    }

    public synchronized List<Message> receive() throws Exception {
        if (this.lastReceiveMessageCount == 0) {
            LockSupport.parkNanos(1L);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<KafkaConsumer<String, Object>> it = this.consumers.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().poll(Duration.ofMillis(0L)).iterator();
            while (it2.hasNext()) {
                ConsumerRecord<String, Object> consumerRecord = (ConsumerRecord) it2.next();
                this.deserializationStrategy.deserialize(consumerRecord);
                arrayList.add(this.deserializationStrategy.deserialize(consumerRecord));
            }
        }
        this.lastReceiveMessageCount = arrayList.size();
        return arrayList;
    }

    private KafkaConsumer<String, Object> createConsumer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.servers);
        properties.put("group.id", str);
        properties.put("key.deserializer", StringDeserializer.class);
        this.deserializationStrategy.configValueDeserializerClass(properties);
        return new KafkaConsumer<>(properties);
    }

    public synchronized void subscribeProcess(String str) {
        KafkaConsumer<String, Object> createConsumer = createConsumer(str + "FOR" + this.name);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        createConsumer.subscribe(arrayList);
        this.consumers.add(createConsumer);
    }

    public List<String> queryAllProcessesToSubscribe() {
        try {
            Set set = (Set) this.adminClient.listTopics().names().get();
            ArrayList arrayList = new ArrayList();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                arrayList.add((String) it.next());
            }
            return arrayList;
        } catch (Exception e) {
            return null;
        }
    }
}
