package io.druid.firehose.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.firehose.kafka.KafkaSimpleConsumer;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.class */
public class KafkaEightSimpleConsumerFirehoseFactory implements FirehoseFactoryV2<ByteBufferInputRowParser> {
    private static final EmittingLogger log = new EmittingLogger(KafkaEightSimpleConsumerFirehoseFactory.class);

    @JsonProperty
    private final List<String> brokerList;

    @JsonProperty
    private final List<Integer> partitionIdList;

    @JsonProperty
    private final String clientId;

    @JsonProperty
    private final String feed;

    @JsonProperty
    private final int queueBufferLength;

    @JsonProperty
    private final boolean earliest;
    private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList();
    private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
    private static final int CONSUMER_FETCH_TIMEOUT = 10000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory$PartitionConsumerWorker.class */
    public static class PartitionConsumerWorker implements Closeable {
        private final String topic;
        private final KafkaSimpleConsumer consumer;
        private final int partitionId;
        private final long startOffset;
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        private volatile Thread thread = null;

        PartitionConsumerWorker(String str, KafkaSimpleConsumer kafkaSimpleConsumer, int i, long j) {
            this.topic = str;
            this.consumer = kafkaSimpleConsumer;
            this.partitionId = i;
            this.startOffset = j;
        }

        public void go(final LinkedBlockingQueue<KafkaSimpleConsumer.BytesMessageWithOffset> linkedBlockingQueue) {
            this.thread = new Thread() { // from class: io.druid.firehose.kafka.KafkaEightSimpleConsumerFirehoseFactory.PartitionConsumerWorker.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    long j = PartitionConsumerWorker.this.startOffset;
                    KafkaEightSimpleConsumerFirehoseFactory.log.info("Start running parition[%s], offset[%s]", new Object[]{Integer.valueOf(PartitionConsumerWorker.this.partitionId), Long.valueOf(j)});
                    while (!PartitionConsumerWorker.this.stopped.get()) {
                        try {
                            try {
                                try {
                                    int i = 0;
                                    for (KafkaSimpleConsumer.BytesMessageWithOffset bytesMessageWithOffset : PartitionConsumerWorker.this.consumer.fetch(j, KafkaEightSimpleConsumerFirehoseFactory.CONSUMER_FETCH_TIMEOUT)) {
                                        j = bytesMessageWithOffset.offset();
                                        linkedBlockingQueue.put(bytesMessageWithOffset);
                                        i++;
                                    }
                                    KafkaEightSimpleConsumerFirehoseFactory.log.debug("fetch [%s] msgs for partition [%s] in one time ", new Object[]{Integer.valueOf(i), Integer.valueOf(PartitionConsumerWorker.this.partitionId)});
                                } catch (InterruptedException e) {
                                    KafkaEightSimpleConsumerFirehoseFactory.log.info("Interrupted when fetching data, shutting down.", new Object[0]);
                                    PartitionConsumerWorker.this.consumer.stop();
                                    return;
                                }
                            } catch (Exception e2) {
                                KafkaEightSimpleConsumerFirehoseFactory.log.error(e2, "Exception happened in fetching data, but will continue consuming", new Object[0]);
                            }
                        } finally {
                            PartitionConsumerWorker.this.consumer.stop();
                        }
                    }
                }
            };
            this.thread.setDaemon(true);
            this.thread.setName(String.format("kafka-%s-%s", this.topic, Integer.valueOf(this.partitionId)));
            this.thread.start();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() throws IOException {
            if (this.stopped.compareAndSet(false, true)) {
                this.thread.interrupt();
                this.thread = null;
            }
        }
    }

    @JsonCreator
    public KafkaEightSimpleConsumerFirehoseFactory(@JsonProperty("brokerList") List<String> list, @JsonProperty("partitionIdList") List<Integer> list2, @JsonProperty("clientId") String str, @JsonProperty("feed") String str2, @JsonProperty("queueBufferLength") Integer num, @JsonProperty("resetOffsetToEarliest") Boolean bool) {
        this.brokerList = list;
        Preconditions.checkArgument(list != null && list.size() > 0, "brokerList is null/empty");
        this.partitionIdList = list2;
        Preconditions.checkArgument(list2 != null && list2.size() > 0, "partitionIdList is null/empty");
        this.clientId = str;
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "clientId is null/empty");
        this.feed = str2;
        Preconditions.checkArgument((str2 == null || str2.isEmpty()) ? false : true, "feed is null/empty");
        this.queueBufferLength = num == null ? DEFAULT_QUEUE_BUFFER_LENGTH : num.intValue();
        Preconditions.checkArgument(num.intValue() > 0, "queueBufferLength must be positive number");
        log.info("queueBufferLength loaded as[%s]", new Object[]{Integer.valueOf(this.queueBufferLength)});
        this.earliest = bool == null ? true : bool.booleanValue();
        EmittingLogger emittingLogger = log;
        Object[] objArr = new Object[1];
        objArr[0] = this.earliest ? "earliest" : "latest";
        emittingLogger.info("if old offsets are not known, data from partition will be read from [%s] available offset.", objArr);
    }

    private Map<Integer, Long> loadOffsetFromPreviousMetaData(Object obj) {
        HashMap newHashMap = Maps.newHashMap();
        if (obj == null) {
            return newHashMap;
        }
        if (obj instanceof Map) {
            for (Map.Entry entry : ((Map) obj).entrySet()) {
                try {
                    int parseInt = Integer.parseInt(entry.getKey().toString());
                    long parseLong = Long.parseLong(entry.getValue().toString());
                    log.debug("Recover last commit information partitionId [%s], offset [%s]", new Object[]{Integer.valueOf(parseInt), Long.valueOf(parseLong)});
                    newHashMap.put(Integer.valueOf(parseInt), Long.valueOf(parseLong));
                } catch (NumberFormatException e) {
                    log.error(e, "Fail to load offset from previous meta data [%s]", new Object[]{entry});
                }
            }
            log.info("Loaded offset map[%s]", new Object[]{newHashMap});
        } else {
            log.makeAlert("Unable to cast lastCommit to Map for feed [%s]", new Object[]{this.feed});
        }
        return newHashMap;
    }

    public FirehoseV2 connect(final ByteBufferInputRowParser byteBufferInputRowParser, Object obj) throws IOException {
        final Map<Integer, Long> loadOffsetFromPreviousMetaData = loadOffsetFromPreviousMetaData(obj);
        for (Integer num : this.partitionIdList) {
            KafkaSimpleConsumer kafkaSimpleConsumer = new KafkaSimpleConsumer(this.feed, num.intValue(), this.clientId, this.brokerList, this.earliest);
            Long l = loadOffsetFromPreviousMetaData.get(num);
            this.consumerWorkers.add(new PartitionConsumerWorker(this.feed, kafkaSimpleConsumer, num.intValue(), l == null ? 0L : l.longValue()));
        }
        final LinkedBlockingQueue<KafkaSimpleConsumer.BytesMessageWithOffset> linkedBlockingQueue = new LinkedBlockingQueue<>(this.queueBufferLength);
        log.info("Kicking off all consumers", new Object[0]);
        Iterator<PartitionConsumerWorker> it = this.consumerWorkers.iterator();
        while (it.hasNext()) {
            it.next().go(linkedBlockingQueue);
        }
        log.info("All consumer started", new Object[0]);
        return new FirehoseV2() { // from class: io.druid.firehose.kafka.KafkaEightSimpleConsumerFirehoseFactory.1
            private volatile boolean stopped;
            private volatile KafkaSimpleConsumer.BytesMessageWithOffset msg = null;
            private volatile InputRow row = null;
            private Map<Integer, Long> lastOffsetPartitions = Maps.newHashMap();

            {
                this.lastOffsetPartitions.putAll(loadOffsetFromPreviousMetaData);
            }

            public void start() throws Exception {
                nextMessage();
            }

            public boolean advance() {
                if (this.stopped) {
                    return false;
                }
                nextMessage();
                return true;
            }

            private void nextMessage() {
                try {
                    this.row = null;
                    while (this.row == null) {
                        if (this.msg != null) {
                            this.lastOffsetPartitions.put(Integer.valueOf(this.msg.getPartition()), Long.valueOf(this.msg.offset()));
                        }
                        this.msg = (KafkaSimpleConsumer.BytesMessageWithOffset) linkedBlockingQueue.take();
                        byte[] message = this.msg.message();
                        this.row = message == null ? null : byteBufferInputRowParser.parse(ByteBuffer.wrap(message));
                    }
                } catch (InterruptedException e) {
                    KafkaEightSimpleConsumerFirehoseFactory.log.warn(e, "Thread Interrupted while taking from queue, propagating the interrupt", new Object[0]);
                    Thread.currentThread().interrupt();
                }
            }

            public InputRow currRow() {
                if (this.stopped) {
                    return null;
                }
                return this.row;
            }

            public Committer makeCommitter() {
                final HashMap newHashMap = Maps.newHashMap(this.lastOffsetPartitions);
                return new Committer() { // from class: io.druid.firehose.kafka.KafkaEightSimpleConsumerFirehoseFactory.1.1
                    public Object getMetadata() {
                        return newHashMap;
                    }

                    public void run() {
                    }
                };
            }

            public void close() throws IOException {
                KafkaEightSimpleConsumerFirehoseFactory.log.info("Stopping kafka 0.8 simple firehose", new Object[0]);
                this.stopped = true;
                Iterator it2 = KafkaEightSimpleConsumerFirehoseFactory.this.consumerWorkers.iterator();
                while (it2.hasNext()) {
                    Closeables.close((PartitionConsumerWorker) it2.next(), true);
                }
            }
        };
    }
}
