package com.jeesuite.kafka.producer;

import com.jeesuite.kafka.KafkaConst;
import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.producer.handler.ProducerEventHandler;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/producer/DefaultTopicProducer.class */
public class DefaultTopicProducer implements TopicProducer, Closeable {
    private static final Logger log = LoggerFactory.getLogger(DefaultTopicProducer.class);
    private KafkaProducer<String, Object> kafkaProducer;
    private List<ProducerEventHandler> eventHanlders = new ArrayList();
    private ZkClient zkClient;
    private boolean consumerAckEnabled;

    public DefaultTopicProducer(KafkaProducer<String, Object> kafkaProducer, ZkClient zkClient, boolean z) {
        this.kafkaProducer = kafkaProducer;
        this.zkClient = zkClient;
        this.consumerAckEnabled = z;
        if (this.zkClient == null || !this.consumerAckEnabled) {
            return;
        }
        String substring = KafkaConst.ZK_PRODUCER_ACK_PATH.substring(0, KafkaConst.ZK_PRODUCER_ACK_PATH.length() - 1);
        if (!this.zkClient.exists(substring)) {
            this.zkClient.createPersistent(substring, true);
        }
        log.info("consumer watcher rootPath:{}", substring);
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer
    public void addEventHandler(ProducerEventHandler producerEventHandler) {
        this.eventHanlders.add(producerEventHandler);
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer
    public boolean publish(String str, DefaultMessage defaultMessage, boolean z) {
        Validate.notNull(str, "Topic is required", new Object[0]);
        Validate.notNull(defaultMessage, "Message is required", new Object[0]);
        boolean z2 = this.consumerAckEnabled && defaultMessage.isConsumerAckRequired();
        if (!z || z2) {
            doSyncSend(str, defaultMessage.getMsgId(), defaultMessage);
        } else {
            doAsynSend(str, defaultMessage.getMsgId(), defaultMessage);
        }
        if (!z2) {
            return true;
        }
        if (this.zkClient == null) {
            log.warn("Message set consumerAck = true,but not zookeeper config[kafka.zkServers] found!!!");
            return true;
        }
        log.debug("wait_consumer_ack,messageId:{}", defaultMessage.getMsgId());
        new ConsumerAckWatcher(defaultMessage.getMsgId(), this.zkClient).waitAck();
        return true;
    }

    private boolean doSyncSend(String str, String str2, DefaultMessage defaultMessage) {
        try {
            RecordMetadata recordMetadata = (RecordMetadata) this.kafkaProducer.send(new ProducerRecord(str, str2, defaultMessage.sendBodyOnly() ? defaultMessage.getBody() : defaultMessage)).get();
            Iterator<ProducerEventHandler> it = this.eventHanlders.iterator();
            while (it.hasNext()) {
                try {
                    it.next().onSuccessed(str, recordMetadata);
                } catch (Exception e) {
                }
            }
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("kafka_send_success,topic=" + str + ", messageId=" + str2 + ", partition=" + recordMetadata.partition() + ", offset=" + recordMetadata.offset());
            return true;
        } catch (Exception e2) {
            log.error("kafka_send_fail,topic=" + str + ",messageId=" + str2, e2);
            throw new RuntimeException(e2);
        }
    }

    private void doAsynSend(final String str, final String str2, final DefaultMessage defaultMessage) {
        this.kafkaProducer.send(new ProducerRecord(str, str2, defaultMessage.sendBodyOnly() ? defaultMessage.getBody() : defaultMessage), new Callback() { // from class: com.jeesuite.kafka.producer.DefaultTopicProducer.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    Iterator it = DefaultTopicProducer.this.eventHanlders.iterator();
                    while (it.hasNext()) {
                        try {
                            ((ProducerEventHandler) it.next()).onError(str, defaultMessage);
                        } catch (Exception e) {
                        }
                    }
                    DefaultTopicProducer.log.error("kafka_send_fail,topic=" + str + ",messageId=" + str2, exc);
                    return;
                }
                Iterator it2 = DefaultTopicProducer.this.eventHanlders.iterator();
                while (it2.hasNext()) {
                    try {
                        ((ProducerEventHandler) it2.next()).onSuccessed(str, recordMetadata);
                    } catch (Exception e2) {
                    }
                }
                if (DefaultTopicProducer.log.isDebugEnabled()) {
                    DefaultTopicProducer.log.debug("kafka_send_success,topic=" + str + ", messageId=" + str2 + ", partition=" + recordMetadata.partition() + ", offset=" + recordMetadata.offset());
                }
            }
        });
    }

    @Override // com.jeesuite.kafka.producer.TopicProducer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.kafkaProducer.close();
        Iterator<ProducerEventHandler> it = this.eventHanlders.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
        }
    }
}
