package io.druid.server.namespace;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.guice.ManageLifecycle;
import io.druid.query.extraction.namespace.KafkaExtractionNamespace;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;

@ManageLifecycle
/* loaded from: input_file:io/druid/server/namespace/KafkaExtractionManager.class */
public class KafkaExtractionManager {
    private final ConcurrentMap<String, String> namespaceVersionMap;
    private static final Logger log = new Logger(KafkaExtractionManager.class);
    private static final Decoder<String> defaultStringDecoder = new Decoder<String>() { // from class: io.druid.server.namespace.KafkaExtractionManager.1
        /* renamed from: fromBytes, reason: merged with bridge method [inline-methods] */
        public String m1fromBytes(byte[] bArr) {
            return StringUtils.fromUtf8(bArr);
        }
    };
    private final Properties kafkaProperties = new Properties();
    private final ConcurrentMap<String, AtomicLong> topicEvents = new ConcurrentHashMap();
    private final Collection<ListenableFuture<?>> futures = new ConcurrentLinkedQueue();
    private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-rename-consumer-%d").setDaemon(true).setPriority(1).build()));
    private final AtomicInteger backgroundTaskCount = new AtomicInteger(0);

    @Inject
    public KafkaExtractionManager(@Named("namespaceVersionMap") ConcurrentMap<String, String> concurrentMap, @Named("renameKafkaProperties") Properties properties) {
        if (properties.containsKey("group.id")) {
            throw new IAE("Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]", new Object[]{properties.getProperty("group.id")});
        }
        if (properties.containsKey("auto.offset.reset")) {
            throw new IAE("Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]", new Object[]{properties.getProperty("auto.offset.reset")});
        }
        this.kafkaProperties.putAll(properties);
        if (!this.kafkaProperties.containsKey("zookeeper.connect")) {
            this.kafkaProperties.put("zookeeper.connect", "localhost:2181/kafka");
        }
        this.kafkaProperties.setProperty("auto.offset.reset", "smallest");
        this.namespaceVersionMap = concurrentMap;
    }

    public long getBackgroundTaskCount() {
        return this.backgroundTaskCount.get();
    }

    public long getNumEvents(String str) {
        AtomicLong atomicLong;
        if (str == null || (atomicLong = this.topicEvents.get(str)) == null) {
            return 0L;
        }
        return atomicLong.get();
    }

    public void addListener(KafkaExtractionNamespace kafkaExtractionNamespace, final Map<String, String> map) {
        final String kafkaTopic = kafkaExtractionNamespace.getKafkaTopic();
        final String namespace = kafkaExtractionNamespace.getNamespace();
        Futures.addCallback(this.executorService.submit(new Runnable() { // from class: io.druid.server.namespace.KafkaExtractionManager.2
            @Override // java.lang.Runnable
            public void run() {
                Properties properties = new Properties();
                properties.putAll(KafkaExtractionManager.this.kafkaProperties);
                properties.setProperty("group.id", UUID.randomUUID().toString());
                List createMessageStreamsByFilter = new ZookeeperConsumerConnector(new ConsumerConfig(properties)).createMessageStreamsByFilter(new Whitelist(Pattern.quote(kafkaTopic)), 1, KafkaExtractionManager.defaultStringDecoder, KafkaExtractionManager.defaultStringDecoder);
                if (createMessageStreamsByFilter == null || createMessageStreamsByFilter.isEmpty()) {
                    throw new IAE("Topic [%s] had no streams", new Object[]{kafkaTopic});
                }
                if (createMessageStreamsByFilter.size() > 1) {
                    throw new ISE("Topic [%s] has %d streams! expected 1", new Object[]{kafkaTopic, Integer.valueOf(createMessageStreamsByFilter.size())});
                }
                KafkaExtractionManager.this.backgroundTaskCount.incrementAndGet();
                ConsumerIterator it = ((KafkaStream) createMessageStreamsByFilter.get(0)).iterator();
                KafkaExtractionManager.log.info("Listening to topic [%s] for namespace [%s]", new Object[]{kafkaTopic, namespace});
                AtomicLong atomicLong = (AtomicLong) KafkaExtractionManager.this.topicEvents.get(namespace);
                if (atomicLong == null) {
                    KafkaExtractionManager.this.topicEvents.putIfAbsent(namespace, new AtomicLong(0L));
                    atomicLong = (AtomicLong) KafkaExtractionManager.this.topicEvents.get(namespace);
                }
                while (it.hasNext()) {
                    MessageAndMetadata next = it.next();
                    String str = (String) next.key();
                    String str2 = (String) next.message();
                    if (str == null || str2 == null) {
                        KafkaExtractionManager.log.error("Bad key/message from topic [%s]: [%s]", new Object[]{kafkaTopic, next});
                    } else {
                        map.put(str, str2);
                        KafkaExtractionManager.this.namespaceVersionMap.put(namespace, Long.toString(atomicLong.incrementAndGet()));
                        KafkaExtractionManager.log.debug("Placed key[%s] val[%s]", new Object[]{str, str2});
                    }
                }
            }
        }), new FutureCallback<Object>() { // from class: io.druid.server.namespace.KafkaExtractionManager.3
            public void onSuccess(Object obj) {
                KafkaExtractionManager.this.topicEvents.remove(namespace);
            }

            public void onFailure(Throwable th) {
                KafkaExtractionManager.this.topicEvents.remove(namespace);
                if (th instanceof CancellationException) {
                    KafkaExtractionManager.log.warn("Cancelled rename task for topic [%s]", new Object[]{kafkaTopic});
                } else {
                    Throwables.propagate(th);
                }
            }
        }, MoreExecutors.sameThreadExecutor());
    }

    @LifecycleStart
    public void start() {
    }

    @LifecycleStop
    public void stop() {
        this.executorService.shutdown();
        Futures.allAsList(this.futures).cancel(true);
    }
}
