package com.jeesuite.kafka.producer;

import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.producer.handler.ProducerEventHandler;
import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/producer/DefaultTopicProducer.class */
public class DefaultTopicProducer implements TopicProducer, Closeable {
    private static final Logger log = LoggerFactory.getLogger(DefaultTopicProducer.class);
    private KafkaProducer<String, Object> kafkaProducer;
    private List<ProducerEventHandler> eventHanlders = new ArrayList();

    public DefaultTopicProducer(KafkaProducer<String, Object> kafkaProducer, boolean z) {
        this.kafkaProducer = kafkaProducer;
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer
    public void addEventHandler(ProducerEventHandler producerEventHandler) {
        this.eventHanlders.add(producerEventHandler);
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer
    public boolean publish(String str, DefaultMessage defaultMessage, boolean z) {
        Validate.notNull(str, "Topic is required", new Object[0]);
        Validate.notNull(defaultMessage, "Message is required", new Object[0]);
        if (!z) {
            return doSyncSend(str, defaultMessage.getMsgId(), defaultMessage);
        }
        try {
            doAsynSend(str, defaultMessage.getMsgId(), defaultMessage);
            return true;
        } catch (Exception e) {
            Iterator<ProducerEventHandler> it = this.eventHanlders.iterator();
            while (it.hasNext()) {
                it.next().onError(str, defaultMessage, z);
            }
            log.error("kafka_send_fail,topic=" + str + ",messageId=" + defaultMessage.getMsgId(), e);
            return false;
        }
    }

    private boolean doSyncSend(String str, String str2, DefaultMessage defaultMessage) {
        try {
            RecordMetadata recordMetadata = (RecordMetadata) this.kafkaProducer.send(new ProducerRecord(str, str2, defaultMessage)).get();
            Iterator<ProducerEventHandler> it = this.eventHanlders.iterator();
            while (it.hasNext()) {
                it.next().onSuccessed(str, recordMetadata);
            }
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("kafka_send_success,topic=" + str + ", messageId=" + str2 + ", partition=" + recordMetadata.partition() + ", offset=" + recordMetadata.offset());
            return true;
        } catch (Exception e) {
            Iterator<ProducerEventHandler> it2 = this.eventHanlders.iterator();
            while (it2.hasNext()) {
                it2.next().onError(str, defaultMessage, false);
            }
            log.error("kafka_send_fail,topic=" + str + ",messageId=" + str2, e);
            return false;
        }
    }

    private void doAsynSend(final String str, final String str2, final DefaultMessage defaultMessage) {
        this.kafkaProducer.send(new ProducerRecord(str, str2, defaultMessage), new Callback() { // from class: com.jeesuite.kafka.producer.DefaultTopicProducer.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    Iterator it = DefaultTopicProducer.this.eventHanlders.iterator();
                    while (it.hasNext()) {
                        ((ProducerEventHandler) it.next()).onError(str, defaultMessage, true);
                    }
                    DefaultTopicProducer.log.error("kafka_send_fail,topic=" + str + ",messageId=" + str2, exc);
                    return;
                }
                Iterator it2 = DefaultTopicProducer.this.eventHanlders.iterator();
                while (it2.hasNext()) {
                    ((ProducerEventHandler) it2.next()).onSuccessed(str, recordMetadata);
                }
                if (DefaultTopicProducer.log.isDebugEnabled()) {
                    DefaultTopicProducer.log.debug("kafka_send_success,topic=" + str + ", messageId=" + str2 + ", partition=" + recordMetadata.partition() + ", offset=" + recordMetadata.offset());
                }
            }
        });
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.kafkaProducer.close();
        Iterator<ProducerEventHandler> it = this.eventHanlders.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
        }
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer
    public boolean publishNoWrapperObject(final String str, Serializable serializable, boolean z) {
        final String uuid = UUID.randomUUID().toString();
        if (z) {
            this.kafkaProducer.send(new ProducerRecord(str, uuid, serializable), new Callback() { // from class: com.jeesuite.kafka.producer.DefaultTopicProducer.2
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        DefaultTopicProducer.log.error("kafka_send_fail,topic=" + str + ",messageId=" + uuid, exc);
                    } else if (DefaultTopicProducer.log.isDebugEnabled()) {
                        DefaultTopicProducer.log.debug("kafka_send_success,topic=" + str + ", messageId=" + uuid + ", partition=" + recordMetadata.partition() + ", offset=" + recordMetadata.offset());
                    }
                }
            });
            return true;
        }
        try {
            RecordMetadata recordMetadata = (RecordMetadata) this.kafkaProducer.send(new ProducerRecord(str, uuid, serializable)).get();
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("kafka_send_success,topic=" + str + ", messageId=" + uuid + ", partition=" + recordMetadata.partition() + ", offset=" + recordMetadata.offset());
            return true;
        } catch (Exception e) {
            log.error("kafka_send_fail,topic=" + str + ",messageId=" + uuid, e);
            throw new RuntimeException(e);
        }
    }
}
