package com.holly.unit.kafka.impl;

import cn.hutool.core.util.StrUtil;
import com.holly.unit.kafka.KafkaApi;
import com.holly.unit.kafka.config.KafkaConfig;
import com.holly.unit.kafka.exception.KafkaException;
import com.holly.unit.kafka.exception.enums.KafkaExceptionEnum;
import com.holly.unit.kafka.model.CallbackReceiveMessage;
import com.holly.unit.kafka.model.KlocalThread;
import com.holly.unit.kafka.model.Kmessage;
import com.holly.unit.kafka.model.KproducerType;
import com.holly.unit.kafka.model.Ktopic;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/holly/unit/kafka/impl/KafkaApiImpl.class */
public class KafkaApiImpl implements KafkaApi {
    private static final Logger log = LoggerFactory.getLogger(KafkaApiImpl.class);
    private static KafkaProducerPool kafkaProducerPool = null;
    private static volatile KafkaConfig kafkaConfig = null;

    public KafkaApiImpl(KafkaConfig kafkaConfig2) {
        if (kafkaConfig == null) {
            synchronized (getClass()) {
                if (kafkaConfig == null) {
                    kafkaConfig = kafkaConfig2;
                    kafkaProducerPool = new KafkaProducerPool(kafkaConfig);
                }
            }
        }
    }

    public KafkaApiImpl() {
    }

    /* JADX WARN: Finally extract failed */
    public void sendMessage(List<Kmessage> list) throws KafkaException {
        KafkaProducerPool.localVar.set(new KlocalThread(Long.toString(Thread.currentThread().getId()), KproducerType.PRODUCER_TYPE_NORMAL));
        KafkaProducer<String, Object> producer = kafkaProducerPool.getProducer(0L);
        try {
            if (list != null) {
                try {
                    if (!list.isEmpty()) {
                        for (Kmessage kmessage : list) {
                            producer.send(new ProducerRecord(kmessage.getTopic(), kmessage.getPartition(), kmessage.getKey(), kmessage.getVal()), new Callback() { // from class: com.holly.unit.kafka.impl.KafkaApiImpl.1
                                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                                    if (exc != null) {
                                        KafkaApiImpl.log.error("发送消息失败:{}", exc);
                                        throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getUserTip(), new Object[]{recordMetadata}));
                                    }
                                    recordMetadata.topic();
                                    recordMetadata.partition();
                                    recordMetadata.offset();
                                }
                            });
                        }
                        kafkaProducerPool.freeProducer(producer);
                        return;
                    }
                } catch (Exception e) {
                    log.error("发送消息失败:{}", e);
                    throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getUserTip(), new Object[]{e.getMessage()}));
                }
            }
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getUserTip(), new Object[]{"消息不能为空"}));
        } catch (Throwable th) {
            kafkaProducerPool.freeProducer(producer);
            throw th;
        }
    }

    public void receiveMessage(String str, Integer num, String str2, CallbackReceiveMessage callbackReceiveMessage, boolean z) throws KafkaException {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaConfig.getBootstrapServers());
        properties.setProperty("group.id", str2);
        properties.put("client.id", Thread.currentThread().getName());
        ArrayList arrayList = new ArrayList();
        properties.setProperty("enable.auto.commit", "false");
        if (z) {
            properties.setProperty("enable.auto.commit", "true");
            properties.setProperty("auto.commit.interval.ms", kafkaConfig.getAutoCommitInterval());
        }
        properties.setProperty("key.deserializer", kafkaConfig.keyDeserializer);
        properties.setProperty("value.deserializer", kafkaConfig.valueDeserializer);
        properties.setProperty("auto.offset.reset", kafkaConfig.autoOffsetReset);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            try {
                if (num == null) {
                    kafkaConsumer.subscribe(Arrays.asList(str));
                } else {
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.add(new TopicPartition(str, num.intValue()));
                    kafkaConsumer.assign(arrayList2);
                }
                while (atomicBoolean.get()) {
                    arrayList.clear();
                    ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(1L));
                    if (!poll.isEmpty()) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            Object key = consumerRecord.key();
                            Object value = consumerRecord.value();
                            Kmessage kmessage = new Kmessage();
                            kmessage.setTopic(consumerRecord.topic());
                            kmessage.setPartition(Integer.valueOf(consumerRecord.partition()));
                            kmessage.setKey(key);
                            kmessage.setOffset(Long.valueOf(consumerRecord.offset()));
                            kmessage.setVal(value);
                            arrayList.add(kmessage);
                        }
                        if (z) {
                            callbackReceiveMessage.receiveMessage(arrayList, (Object) null, atomicBoolean);
                        } else {
                            callbackReceiveMessage.receiveMessage(arrayList, kafkaConsumer, atomicBoolean);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("获取全部topic失败:{}", e);
                throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_RECEIVE_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_RECEIVE_ERROR.getUserTip(), new Object[]{e.getMessage()}));
            }
        } finally {
            kafkaConsumer.close();
        }
    }

    public void sendMessageWithTranactions(List<Kmessage> list) throws KafkaException {
        KafkaProducerPool.localVar.set(new KlocalThread("holl_" + kafkaConfig.getTranactionalId(), KproducerType.PRODUCER_TYPE_TRANCTION));
        KafkaProducer<String, Object> producer = kafkaProducerPool.getProducer(0L);
        if (list == null || list.isEmpty()) {
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getUserTip(), new Object[]{"发送消息不能为空"}));
        }
        try {
            producer.beginTransaction();
            for (Kmessage kmessage : list) {
                producer.send(new ProducerRecord(kmessage.getTopic(), kmessage.getPartition(), kmessage.getKey(), kmessage.getVal()));
            }
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            log.error("事务发送数据:{}", e);
            producer.close();
            throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getUserTip(), new Object[]{e.getMessage()}));
        }
    }

    public void topicTransformation(String str, String str2, Ktopic ktopic, Ktopic ktopic2, CallbackReceiveMessage callbackReceiveMessage) {
        KafkaConsumer kafkaConsumer = null;
        try {
            if (str2 == null) {
                try {
                    str2 = kafkaConfig.tranactionalId;
                } catch (Exception e) {
                    log.error("kafkaFromT2T:{}", e);
                    throw new KafkaException(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getErrorCode(), StrUtil.format(KafkaExceptionEnum.KAFKA_MQ_SEND_ERROR.getUserTip(), new Object[]{e.getMessage()}));
                }
            }
            KafkaProducerPool.localVar.set(new KlocalThread("holl_" + str2, KproducerType.PRODUCER_TYPE_TRANCTION));
            KafkaProducer<String, Object> producer = kafkaProducerPool.getProducer(0L);
            KafkaConsumer<String, String> makeConsumer = makeConsumer(str, ktopic);
            ArrayList arrayList = new ArrayList();
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            while (atomicBoolean.get()) {
                try {
                    arrayList.clear();
                    producer.beginTransaction();
                    HashMap hashMap = new HashMap();
                    Iterator it = makeConsumer.poll(Duration.ofSeconds(5L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        try {
                            Object key = consumerRecord.key();
                            Object value = consumerRecord.value();
                            long offset = consumerRecord.offset();
                            Kmessage kmessage = new Kmessage();
                            kmessage.setTopic(consumerRecord.topic());
                            kmessage.setPartition(Integer.valueOf(consumerRecord.partition()));
                            kmessage.setKey(key);
                            kmessage.setVal(value);
                            arrayList.add(kmessage);
                            hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(offset + 1));
                            callbackReceiveMessage.receiveMessage(arrayList, makeConsumer, atomicBoolean);
                            producer.send(new ProducerRecord(ktopic2.getTopic(), ktopic2.getPartition(), kmessage.getKey(), kmessage.getVal())).get();
                        } catch (Exception e2) {
                            producer.abortTransaction();
                            throw new RuntimeException(e2);
                        }
                    }
                    producer.sendOffsetsToTransaction(hashMap, str);
                    producer.commitTransaction();
                } catch (Exception e3) {
                    throw new RuntimeException(e3);
                }
            }
            kafkaProducerPool.removeProducer();
            if (makeConsumer != null) {
                makeConsumer.close();
            }
        } catch (Throwable th) {
            kafkaProducerPool.removeProducer();
            if (0 != 0) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }

    private KafkaConsumer<String, String> makeConsumer(String str, Ktopic ktopic) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaConfig.getBootstrapServers());
        properties.setProperty("group.id", str);
        properties.put("client.id", Thread.currentThread().getName());
        properties.put("isolation.level", "read_committed");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", kafkaConfig.getAutoCommitInterval());
        properties.setProperty("key.deserializer", kafkaConfig.keyDeserializer);
        properties.setProperty("value.deserializer", kafkaConfig.valueDeserializer);
        properties.setProperty("auto.offset.reset", kafkaConfig.autoOffsetReset);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        if (ktopic.getPartition() == null) {
            kafkaConsumer.subscribe(Arrays.asList(ktopic.getTopic()));
        } else {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(ktopic.getTopic(), ktopic.getPartition().intValue()));
            kafkaConsumer.assign(arrayList);
        }
        return kafkaConsumer;
    }
}
