package com.jeesuite.kafka.spring;

import com.jeesuite.common.util.NodeNameHolder;
import com.jeesuite.common.util.ResourceUtils;
import com.jeesuite.kafka.KafkaConst;
import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.partiton.DefaultPartitioner;
import com.jeesuite.kafka.producer.DefaultTopicProducer;
import com.jeesuite.kafka.producer.TopicProducer;
import com.jeesuite.kafka.producer.handler.SendCounterHandler;
import com.jeesuite.kafka.producer.handler.SendErrorDelayRetryHandler;
import com.jeesuite.kafka.serializer.KyroMessageSerializer;
import com.jeesuite.kafka.serializer.ZKStringSerializer;
import java.io.Serializable;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/jeesuite/kafka/spring/TopicProducerSpringProvider.class */
public class TopicProducerSpringProvider implements InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(TopicProducerSpringProvider.class);
    private TopicProducer producer;
    private Properties configs;
    private String producerGroup;
    private String routeEnv;
    private ZkClient zkClient;
    private boolean defaultAsynSend = true;
    private boolean monitorEnabled = false;
    private int delayRetries = 3;
    private boolean consumerAckEnabled = true;

    public void afterPropertiesSet() throws Exception {
        Validate.notEmpty(this.configs, "configs is required", new Object[0]);
        this.routeEnv = StringUtils.trimToNull(ResourceUtils.getProperty(KafkaConst.PROP_ENV_ROUTE));
        if (this.routeEnv != null) {
            log.info("current route Env value is:", this.routeEnv);
        }
        for (String str : this.configs.stringPropertyNames()) {
            String property = this.configs.getProperty(str);
            if (StringUtils.isBlank(property) || property.trim().startsWith("$")) {
                this.configs.remove(str);
                log.warn("remove prop[{}],value is:{}", str, property);
            }
        }
        if (!this.configs.containsKey("key.serializer")) {
            this.configs.put("key.serializer", StringSerializer.class.getName());
        }
        if (!this.configs.containsKey("value.serializer")) {
            this.configs.put("value.serializer", KyroMessageSerializer.class.getName());
        }
        if (!this.configs.containsKey("partitioner.class")) {
            this.configs.put("partitioner.class", DefaultPartitioner.class.getName());
        }
        if (!this.configs.containsKey("retries")) {
            this.configs.put("retries", "1");
        }
        if (!this.configs.containsKey("compression.type")) {
            this.configs.put("compression.type", "snappy");
        }
        if (!this.configs.containsKey("client.id")) {
            this.configs.put("client.id", (this.producerGroup == null ? "" : "_" + this.producerGroup) + NodeNameHolder.getNodeId());
        }
        KafkaProducer kafkaProducer = new KafkaProducer(this.configs);
        String property2 = ResourceUtils.getProperty("kafka.zkServers");
        if (StringUtils.isNotBlank(property2)) {
            this.zkClient = new ZkClient(property2, 10000, 5000, new ZKStringSerializer());
        }
        this.producer = new DefaultTopicProducer(kafkaProducer, this.zkClient, this.consumerAckEnabled);
        if (this.monitorEnabled) {
            Validate.notBlank(this.producerGroup, "enable producer monitor property[producerGroup] is required", new Object[0]);
            Validate.notNull(this.zkClient, "enable producer monitor property[kafka.zkServers] is required", new Object[0]);
            this.producer.addEventHandler(new SendCounterHandler(this.producerGroup, this.zkClient));
        }
        if (this.delayRetries > 0) {
            this.producer.addEventHandler(new SendErrorDelayRetryHandler(this.producerGroup, kafkaProducer, this.delayRetries));
        }
    }

    public void destroy() throws Exception {
        this.producer.close();
    }

    public void setConfigs(Properties properties) {
        this.configs = properties;
    }

    public void setDefaultAsynSend(boolean z) {
        this.defaultAsynSend = z;
    }

    public void setProducerGroup(String str) {
        this.producerGroup = str;
    }

    public void setMonitorEnabled(boolean z) {
        this.monitorEnabled = z;
    }

    public void setDelayRetries(int i) {
        this.delayRetries = i;
    }

    public void setConsumerAckEnabled(boolean z) {
        this.consumerAckEnabled = z;
    }

    public boolean publish(String str, DefaultMessage defaultMessage) {
        return publish(str, defaultMessage, this.defaultAsynSend);
    }

    public boolean publish(String str, DefaultMessage defaultMessage, boolean z) {
        if (this.routeEnv != null) {
            str = this.routeEnv + "." + str;
        }
        return this.producer.publish(str, defaultMessage, z);
    }

    public boolean publishNoWrapperMessage(String str, Serializable serializable) {
        return publishNoWrapperMessage(str, serializable, this.defaultAsynSend);
    }

    public boolean publishNoWrapperMessage(String str, Serializable serializable, boolean z) {
        return publishNoWrapperMessage(str, null, serializable, z);
    }

    public boolean publishNoWrapperMessage(String str, String str2, Serializable serializable, boolean z) {
        DefaultMessage sendBodyOnly = new DefaultMessage(str2, serializable).sendBodyOnly(true);
        if (this.routeEnv != null) {
            str = this.routeEnv + "." + str;
        }
        return this.producer.publish(str, sendBodyOnly, z);
    }
}
