package org.apache.hadoop.metrics2.sink;

import com.google.common.base.Strings;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/hadoop/metrics2/sink/KafkaSink.class */
public class KafkaSink implements MetricsSink, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    public static final String BROKER_LIST = "broker_list";
    public static final String TOPIC = "topic";
    private String hostname = null;
    private String brokerList = null;
    private String topic = null;
    private Producer<Integer, byte[]> producer = null;
    private final DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    private final DateTimeFormatter timeFormat = DateTimeFormatter.ofPattern("HH:mm:ss");
    private final ZoneId zoneId = ZoneId.systemDefault();

    public void setProducer(Producer<Integer, byte[]> producer) {
        this.producer = producer;
    }

    public void init(SubsetConfiguration subsetConfiguration) {
        Properties properties = new Properties();
        this.brokerList = subsetConfiguration.getString(BROKER_LIST);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Broker list " + this.brokerList);
        }
        properties.put("bootstrap.servers", this.brokerList);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Kafka brokers: " + this.brokerList);
        }
        this.topic = subsetConfiguration.getString(TOPIC);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Kafka topic " + this.topic);
        }
        if (Strings.isNullOrEmpty(this.topic)) {
            throw new MetricsException("Kafka topic can not be null");
        }
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("request.required.acks", "0");
        this.hostname = "null";
        try {
            this.hostname = InetAddress.getLocalHost().getHostName();
        } catch (Exception e) {
            LOG.warn("Error getting Hostname, going to continue");
        }
        try {
            this.producer = new KafkaProducer(properties);
        } catch (Exception e2) {
            throw new MetricsException("Error creating Producer, " + this.brokerList, e2);
        }
    }

    public void putMetrics(MetricsRecord metricsRecord) {
        if (this.producer == null) {
            throw new MetricsException("Producer in KafkaSink is null!");
        }
        StringBuilder sb = new StringBuilder();
        long timestamp = metricsRecord.timestamp();
        LocalDateTime ofInstant = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), this.zoneId);
        String format = ofInstant.format(this.dateFormat);
        String format2 = ofInstant.format(this.timeFormat);
        sb.append("{\"hostname\": \"" + this.hostname);
        sb.append("\", \"timestamp\": " + timestamp);
        sb.append(", \"date\": \"" + format);
        sb.append("\",\"time\": \"" + format2);
        sb.append("\",\"name\": \"" + metricsRecord.name() + "\" ");
        for (MetricsTag metricsTag : metricsRecord.tags()) {
            sb.append(", \"" + metricsTag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
            sb.append(" \"" + metricsTag.value().toString() + "\"");
        }
        for (AbstractMetric abstractMetric : metricsRecord.metrics()) {
            sb.append(", \"" + abstractMetric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
            sb.append(" \"" + abstractMetric.value().toString() + "\"");
        }
        sb.append("}");
        LOG.debug("kafka message: " + sb.toString());
        Future send = this.producer.send(new ProducerRecord(this.topic, sb.toString().getBytes(Charset.forName("UTF-8"))));
        sb.setLength(0);
        try {
            send.get();
        } catch (InterruptedException e) {
            throw new MetricsException("Error sending data", e);
        } catch (ExecutionException e2) {
            throw new MetricsException("Error sending data", e2);
        }
    }

    public void flush() {
        LOG.debug("Kafka seems not to have any flush() mechanism!");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            try {
                this.producer.close();
                this.producer = null;
            } catch (RuntimeException e) {
                throw new MetricsException("Error closing producer", e);
            }
        } catch (Throwable th) {
            this.producer = null;
            throw th;
        }
    }
}
