package org.cg.eventbus.producer;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import org.cg.eventbus.ConfigUtil;
import org.cg.eventbus.ICallback;
import org.cg.eventbus.IProducer;

/* loaded from: input_file:org/cg/eventbus/producer/AbstractEventProducer.class */
public abstract class AbstractEventProducer<K, V> implements IProducer<K, V> {
    public static final String DEFAULT_BROKER_LIST = "localhost:9092";
    public static final String PRO_TYPE_ASYNC = "async";
    public static final String QUIT_ON_ERROR = "quit-on-error";
    public static final String NO_ACK = "0";
    public static final String LEADER_ACK = "1";
    public static final String REPLICAS_ACK = "-1";
    public static final String DEFAULT_PARTITION = "kafka.producer.DefaultPartitioner";
    private static final Logger logger = Logger.getLogger(AbstractEventProducer.class);
    private String topic;
    private Properties producerConfig;
    private Configuration config;
    private KafkaProducer<K, V> producer;
    private boolean quitOnError;

    public AbstractEventProducer(String str) throws Exception {
        this(str, (String) null);
    }

    public AbstractEventProducer(String str, String str2) throws Exception {
        this.quitOnError = false;
        if (str2 == null) {
            initialize(new PropertiesConfiguration(str));
        } else {
            initialize(ConfigUtil.extractConfiguration(ConfigurationConverter.getProperties(new PropertiesConfiguration(str)), str2));
        }
    }

    public AbstractEventProducer(Configuration configuration, String str) throws Exception {
        this.quitOnError = false;
        if (null == str) {
            initialize(configuration);
        } else {
            initialize(ConfigUtil.extractConfiguration(configuration, str));
        }
    }

    public AbstractEventProducer(Configuration configuration) throws Exception {
        this.quitOnError = false;
        initialize(configuration);
    }

    protected void initialize(Configuration configuration) throws Exception {
        this.config = configuration;
        ProducerConfigurator.validate(configuration);
        this.producerConfig = ConfigurationConverter.getProperties(configuration);
        this.topic = configuration.getString(IProducer.PRODUCER_TOPIC);
        if (configuration.containsKey(QUIT_ON_ERROR)) {
            this.quitOnError = configuration.getBoolean(QUIT_ON_ERROR);
        }
        this.producer = new KafkaProducer<>(this.producerConfig);
        logger.info("producer initialized");
    }

    public Configuration getConfig() {
        return this.config;
    }

    @Override // org.cg.eventbus.IProducer
    public void send(V v, ICallback iCallback) {
        if (null == v) {
            return;
        }
        send((AbstractEventProducer<K, V>) getKey(v), (K) v, iCallback);
    }

    @Override // org.cg.eventbus.IProducer
    public void send(List<V> list, ICallback iCallback) {
        if (list == null) {
            logger.error("ignore null events");
            return;
        }
        Iterator<V> it = list.iterator();
        while (it.hasNext()) {
            send((AbstractEventProducer<K, V>) it.next(), iCallback);
        }
    }

    @Override // org.cg.eventbus.IProducer
    public void send(K k, V v, final ICallback iCallback) {
        if (null == k || null == v) {
            return;
        }
        this.producer.send(new ProducerRecord(this.topic, k, v), new Callback() { // from class: org.cg.eventbus.producer.AbstractEventProducer.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    AbstractEventProducer.logger.error("failed to send event: ", exc);
                    if (AbstractEventProducer.this.quitOnError) {
                        System.exit(1);
                    }
                }
                if (recordMetadata != null) {
                    ICallback.Response response = new ICallback.Response();
                    response.setTopic(recordMetadata.topic());
                    response.setOffset(recordMetadata.offset());
                    response.setPartition(recordMetadata.partition());
                    if (iCallback != null) {
                        iCallback.onCompletion(response, exc);
                    }
                }
            }
        });
    }

    public Logger getLogger() {
        return logger;
    }

    @Override // org.cg.eventbus.IProducer
    public void send(List<K> list, List<V> list2, ICallback iCallback) {
        if (null == list || null == list2 || list.size() < 1 || list2.size() < 1) {
            return;
        }
        if (list.size() != list2.size()) {
            logger.error("Sizes of keys and messages are different. " + list.size() + ":" + list2.size());
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            send((AbstractEventProducer<K, V>) list.get(i), (K) list2.get(i), iCallback);
        }
    }

    @Override // org.cg.eventbus.IProducer
    public void close() {
        logger.info("Producer [" + this.topic + "] shutdown");
        this.producer.close();
    }
}
