package net.kut3.logging.kafka;

import java.util.List;
import net.kut3.app.MainApplication;
import net.kut3.config.Config;
import net.kut3.logging.LogWriter;
import net.kut3.logging.kafka.LogWriterConfig;
import net.kut3.util.Strings;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/kut3/logging/kafka/KafkaWriter.class */
public class KafkaWriter implements LogWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaWriter.class);
    private LogWriterConfig config;
    private Producer<String, String> producer;
    private volatile boolean closed = true;

    public KafkaWriter(String str, String str2) {
        if (Strings.isNullOrBlank(str)) {
            throw new IllegalArgumentException("[servers]servers");
        }
        if (Strings.isNullOrBlank(str2)) {
            throw new IllegalArgumentException("[servers]client-id");
        }
        String property = System.getProperty(MainApplication.APP_NAME_PROPERTY);
        if (Strings.isNullOrBlank(property)) {
            throw new IllegalArgumentException("application.name nullOrEmpty");
        }
        LogWriterConfig.Builder clientId = new LogWriterConfig.Builder().setTopic(property).setClientId(str2);
        for (String str3 : str.split(Config.DEFAULT_LIST_SEPARATOR)) {
            String[] split = str3.split(":");
            clientId.addHostPort(split[0], Integer.parseInt(split[1]));
        }
        init(clientId.build());
    }

    public KafkaWriter(LogWriterConfig logWriterConfig) {
        init(logWriterConfig);
    }

    @Override // net.kut3.logging.LogWriter
    public void write(String str) {
        if (this.closed) {
            LOGGER.warn("Producer already closed: " + str);
            return;
        }
        try {
            this.producer.send(new ProducerRecord(this.config.builder.topic, this.config.builder.partition, (Object) null, str));
        } catch (Throwable th) {
            LOGGER.error(str, th);
        }
    }

    private void init(LogWriterConfig logWriterConfig) {
        this.config = logWriterConfig;
        this.producer = new KafkaProducer(this.config.toProperties());
        if (null != logWriterConfig.builder.partition) {
            List partitionsFor = this.producer.partitionsFor(this.config.builder.topic);
            if (logWriterConfig.builder.partition.intValue() > 0 && logWriterConfig.builder.partition.intValue() > partitionsFor.size() - 1) {
                this.producer.close();
                this.closed = true;
                throw new IllegalArgumentException("Topic '" + this.config.builder.topic + "' has only " + partitionsFor.size() + " partition(s)");
            }
        }
        this.closed = false;
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.producer.flush();
            this.producer.close();
        }));
    }
}
