package org.apache.james.mailbox.kafka;

import java.util.Properties;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.james.mailbox.store.publisher.Publisher;
import org.apache.james.mailbox.store.publisher.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/james/mailbox/kafka/KafkaPublisher.class */
public class KafkaPublisher implements Publisher {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPublisher.class);
    private Producer<String, byte[]> producer;
    private final int kafka_port;
    private final String kafka_ip;
    private boolean producerLaunched = false;

    public KafkaPublisher(String str, int i) {
        this.kafka_ip = str;
        this.kafka_port = i;
    }

    @PostConstruct
    public void init() {
        if (this.producerLaunched) {
            LOG.warn("Kafka producer was already instantiated");
            return;
        }
        Properties properties = new Properties();
        properties.put("metadata.broker.list", this.kafka_ip + ":" + this.kafka_port);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        this.producer = new Producer<>(new ProducerConfig(properties));
        this.producerLaunched = true;
    }

    public void publish(Topic topic, byte[] bArr) {
        this.producer.send(new KeyedMessage(topic.getValue(), bArr));
    }

    @PreDestroy
    public void close() {
        this.producer.close();
    }
}
