package org.apache.james.mailbox.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.james.mailbox.store.publisher.MessageConsumer;
import org.apache.james.mailbox.store.publisher.MessageReceiver;
import org.apache.james.mailbox.store.publisher.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/james/mailbox/kafka/KafkaMessageConsumer.class */
public class KafkaMessageConsumer implements MessageConsumer {
    private static final String ZK_SESSION_TIMEOUT = "400";
    private static final String ZK_SYNC_TIME = "200";
    private static final String AUTO_COMMIT8INTERVAL_MS = "1000";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);
    private final ConsumerConnector consumer;
    private final int numberOfTread;
    private MessageReceiver messageReceiver;
    private ExecutorService executor;
    private boolean isInitialized = false;

    /* loaded from: input_file:org/apache/james/mailbox/kafka/KafkaMessageConsumer$Consumer.class */
    private class Consumer implements Runnable {
        private final KafkaStream<byte[], byte[]> m_stream;

        public Consumer(KafkaStream<byte[], byte[]> kafkaStream) {
            this.m_stream = kafkaStream;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumerIterator it = this.m_stream.iterator();
            while (it.hasNext()) {
                KafkaMessageConsumer.this.messageReceiver.receiveSerializedEvent((byte[]) ((MessageAndMetadata) it.next()).message());
            }
        }
    }

    public KafkaMessageConsumer(String str, String str2, int i) {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(str, str2));
        this.numberOfTread = i;
    }

    public void setMessageReceiver(MessageReceiver messageReceiver) {
        if (this.isInitialized) {
            throw new RuntimeException("Can not change the MessageReceiver of a running KafkaMessageConsumer");
        }
        this.messageReceiver = messageReceiver;
    }

    @PreDestroy
    public void destroy() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
        this.isInitialized = false;
    }

    @PostConstruct
    public void init(Topic topic) {
        if (this.isInitialized) {
            LOG.warn("This Kafka MailboxMessage Receiver was already launched.");
            return;
        }
        this.isInitialized = true;
        List<KafkaStream<byte[], byte[]>> kafkaStreams = getKafkaStreams(topic.getValue());
        this.executor = Executors.newFixedThreadPool(this.numberOfTread);
        startConsuming(kafkaStreams);
    }

    private List<KafkaStream<byte[], byte[]>> getKafkaStreams(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(str, Integer.valueOf(this.numberOfTread));
        return (List) this.consumer.createMessageStreams(hashMap).get(str);
    }

    private void startConsuming(List<KafkaStream<byte[], byte[]>> list) {
        list.forEach(kafkaStream -> {
            this.executor.submit(new Consumer(kafkaStream));
        });
    }

    private ConsumerConfig createConsumerConfig(String str, String str2) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", str);
        properties.put("group.id", str2);
        properties.put("zookeeper.session.timeout.ms", ZK_SESSION_TIMEOUT);
        properties.put("zookeeper.sync.time.ms", ZK_SYNC_TIME);
        properties.put("auto.commit.interval.ms", AUTO_COMMIT8INTERVAL_MS);
        return new ConsumerConfig(properties);
    }
}
