package io.druid.query.lookup;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.ws.rs.GET;
import javax.ws.rs.core.Response;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;

@JsonTypeName("kafka")
/* loaded from: input_file:io/druid/query/lookup/KafkaLookupExtractorFactory.class */
public class KafkaLookupExtractorFactory implements LookupExtractorFactory {
    private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
    static final Decoder<String> DEFAULT_STRING_DECODER = new Decoder<String>() { // from class: io.druid.query.lookup.KafkaLookupExtractorFactory.1
        /* renamed from: fromBytes, reason: merged with bridge method [inline-methods] */
        public String m2fromBytes(byte[] bArr) {
            return StringUtils.fromUtf8(bArr);
        }
    };
    private final ListeningExecutorService executorService;
    private final AtomicLong doubleEventCount;
    private final NamespaceExtractionCacheManager cacheManager;
    private final String factoryId;
    private final AtomicReference<Map<String, String>> mapRef;
    private final AtomicBoolean started;
    private volatile ListenableFuture<?> future;

    @JsonProperty
    private final String kafkaTopic;

    @JsonProperty
    private final Map<String, String> kafkaProperties;

    @JsonProperty
    private final long connectTimeout;

    @JsonProperty
    private final boolean injective;

    /* loaded from: input_file:io/druid/query/lookup/KafkaLookupExtractorFactory$KafkaLookupExtractorIntrospectionHandler.class */
    class KafkaLookupExtractorIntrospectionHandler implements LookupIntrospectHandler {
        KafkaLookupExtractorIntrospectionHandler() {
        }

        @GET
        public Response getActive() {
            ListenableFuture<?> future = KafkaLookupExtractorFactory.this.getFuture();
            return (future == null || future.isDone()) ? Response.status(Response.Status.GONE).build() : Response.ok().build();
        }
    }

    @JsonCreator
    public KafkaLookupExtractorFactory(@JacksonInject NamespaceExtractionCacheManager namespaceExtractionCacheManager, @JsonProperty("kafkaTopic") String str, @JsonProperty("kafkaProperties") Map<String, String> map, @JsonProperty("connectTimeout") @Min(0) long j, @JsonProperty("injective") boolean z) {
        this.doubleEventCount = new AtomicLong(0L);
        this.factoryId = UUID.randomUUID().toString();
        this.mapRef = new AtomicReference<>(null);
        this.started = new AtomicBoolean(false);
        this.future = null;
        this.kafkaTopic = (String) Preconditions.checkNotNull(str, "kafkaTopic required");
        this.kafkaProperties = (Map) Preconditions.checkNotNull(map, "kafkaProperties required");
        this.executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded("kafka-factory-" + str + "-%s", 1));
        this.cacheManager = namespaceExtractionCacheManager;
        this.connectTimeout = j;
        this.injective = z;
    }

    public KafkaLookupExtractorFactory(NamespaceExtractionCacheManager namespaceExtractionCacheManager, String str, Map<String, String> map) {
        this(namespaceExtractionCacheManager, str, map, 0L, false);
    }

    public String getKafkaTopic() {
        return this.kafkaTopic;
    }

    public Map<String, String> getKafkaProperties() {
        return this.kafkaProperties;
    }

    public long getConnectTimeout() {
        return this.connectTimeout;
    }

    public boolean isInjective() {
        return this.injective;
    }

    public boolean start() {
        synchronized (this.started) {
            if (this.started.get()) {
                LOG.warn("Already started, not starting again", new Object[0]);
                return this.started.get();
            }
            if (this.executorService.isShutdown()) {
                LOG.warn("Already shut down, not starting again", new Object[0]);
                return false;
            }
            final Properties properties = new Properties();
            properties.putAll(getKafkaProperties());
            if (properties.containsKey("group.id")) {
                throw new IAE("Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]", new Object[]{properties.getProperty("group.id")});
            }
            if (properties.containsKey("auto.offset.reset")) {
                throw new IAE("Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]", new Object[]{properties.getProperty("auto.offset.reset")});
            }
            Preconditions.checkNotNull(properties.getProperty("zookeeper.connect"), "zookeeper.connect required property");
            properties.setProperty("group.id", this.factoryId);
            final String kafkaTopic = getKafkaTopic();
            LOG.debug("About to listen to topic [%s] with group.id [%s]", new Object[]{kafkaTopic, this.factoryId});
            final ConcurrentMap cacheMap = this.cacheManager.getCacheMap(this.factoryId);
            this.mapRef.set(cacheMap);
            properties.setProperty("auto.offset.reset", "smallest");
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ListenableFuture<?> submit = this.executorService.submit(new Runnable() { // from class: io.druid.query.lookup.KafkaLookupExtractorFactory.2
                @Override // java.lang.Runnable
                public void run() {
                    List createMessageStreamsByFilter;
                    while (!KafkaLookupExtractorFactory.this.executorService.isShutdown() && !Thread.currentThread().isInterrupted()) {
                        ConsumerConnector buildConnector = KafkaLookupExtractorFactory.this.buildConnector(properties);
                        try {
                            try {
                                createMessageStreamsByFilter = buildConnector.createMessageStreamsByFilter(new Whitelist(Pattern.quote(kafkaTopic)), 1, KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER, KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER);
                            } catch (Exception e) {
                                KafkaLookupExtractorFactory.LOG.error(e, "Error reading stream for topic [%s]", new Object[]{kafkaTopic});
                                buildConnector.shutdown();
                            }
                            if (createMessageStreamsByFilter == null || createMessageStreamsByFilter.isEmpty()) {
                                throw new IAE("Topic [%s] had no streams", new Object[]{kafkaTopic});
                            }
                            if (createMessageStreamsByFilter.size() > 1) {
                                throw new ISE("Topic [%s] has %d streams! expected 1", new Object[]{kafkaTopic, Integer.valueOf(createMessageStreamsByFilter.size())});
                            }
                            KafkaStream kafkaStream = (KafkaStream) createMessageStreamsByFilter.get(0);
                            countDownLatch.countDown();
                            ConsumerIterator it = kafkaStream.iterator();
                            while (it.hasNext()) {
                                MessageAndMetadata messageAndMetadata = (MessageAndMetadata) it.next();
                                String str = (String) messageAndMetadata.key();
                                String str2 = (String) messageAndMetadata.message();
                                if (str == null || str2 == null) {
                                    KafkaLookupExtractorFactory.LOG.error("Bad key/message from topic [%s]: [%s]", new Object[]{kafkaTopic, messageAndMetadata});
                                } else {
                                    KafkaLookupExtractorFactory.this.doubleEventCount.incrementAndGet();
                                    cacheMap.put(str, str2);
                                    KafkaLookupExtractorFactory.this.doubleEventCount.incrementAndGet();
                                    KafkaLookupExtractorFactory.LOG.trace("Placed key[%s] val[%s]", new Object[]{str, str2});
                                }
                            }
                            buildConnector.shutdown();
                        } catch (Throwable th) {
                            buildConnector.shutdown();
                            throw th;
                        }
                    }
                }
            });
            Futures.addCallback(submit, new FutureCallback<Object>() { // from class: io.druid.query.lookup.KafkaLookupExtractorFactory.3
                public void onSuccess(Object obj) {
                    KafkaLookupExtractorFactory.LOG.debug("Success listening to [%s]", new Object[]{kafkaTopic});
                }

                public void onFailure(Throwable th) {
                    if (th instanceof CancellationException) {
                        KafkaLookupExtractorFactory.LOG.debug("Topic [%s] cancelled", new Object[]{kafkaTopic});
                    } else {
                        KafkaLookupExtractorFactory.LOG.error(th, "Error in listening to [%s]", new Object[]{kafkaTopic});
                    }
                }
            }, MoreExecutors.sameThreadExecutor());
            this.future = submit;
            Stopwatch createStarted = Stopwatch.createStarted();
            while (!countDownLatch.await(100L, TimeUnit.MILLISECONDS) && this.connectTimeout > 0) {
                try {
                    if (submit.isDone()) {
                        submit.get();
                    } else if (createStarted.elapsed(TimeUnit.MILLISECONDS) > this.connectTimeout) {
                        throw new TimeoutException("Failed to connect to kafka in sufficient time");
                    }
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    if (!submit.isDone() && !submit.cancel(true) && !submit.isDone()) {
                        LOG.warn("Could not cancel kafka listening thread", new Object[0]);
                    }
                    LOG.error(e, "Failed to start kafka extraction factory", new Object[0]);
                    this.cacheManager.delete(this.factoryId);
                    return false;
                }
            }
            this.started.set(true);
            return true;
        }
    }

    ConsumerConnector buildConnector(Properties properties) {
        return new ZookeeperConsumerConnector(new ConsumerConfig(properties));
    }

    public boolean close() {
        synchronized (this.started) {
            if (!this.started.get() || this.executorService.isShutdown()) {
                LOG.info("Already shutdown, ignoring", new Object[0]);
                return !this.started.get();
            }
            this.started.set(false);
            this.executorService.shutdownNow();
            ListenableFuture<?> listenableFuture = this.future;
            if (listenableFuture != null && !listenableFuture.isDone() && !listenableFuture.cancel(true) && !listenableFuture.isDone()) {
                LOG.error("Error cancelling future for topic [%s]", new Object[]{getKafkaTopic()});
                return false;
            }
            if (this.cacheManager.delete(this.factoryId)) {
                return true;
            }
            LOG.error("Error removing [%s] for topic [%s] from cache", new Object[]{this.factoryId, getKafkaTopic()});
            return false;
        }
    }

    public boolean replaces(@Nullable LookupExtractorFactory lookupExtractorFactory) {
        if (this == lookupExtractorFactory || lookupExtractorFactory == null) {
            return false;
        }
        if (getClass() != lookupExtractorFactory.getClass()) {
            return true;
        }
        KafkaLookupExtractorFactory kafkaLookupExtractorFactory = (KafkaLookupExtractorFactory) lookupExtractorFactory;
        return (getKafkaTopic().equals(kafkaLookupExtractorFactory.getKafkaTopic()) && getKafkaProperties().equals(kafkaLookupExtractorFactory.getKafkaProperties()) && getConnectTimeout() == kafkaLookupExtractorFactory.getConnectTimeout() && isInjective() == kafkaLookupExtractorFactory.isInjective()) ? false : true;
    }

    @Nullable
    public LookupIntrospectHandler getIntrospectHandler() {
        return new KafkaLookupExtractorIntrospectionHandler();
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public LookupExtractor m1get() {
        Map map = (Map) Preconditions.checkNotNull(this.mapRef.get(), "Not started");
        final long j = this.doubleEventCount.get();
        return new MapLookupExtractor(map, isInjective()) { // from class: io.druid.query.lookup.KafkaLookupExtractorFactory.4
            public byte[] getCacheKey() {
                byte[] utf8 = StringUtils.toUtf8(KafkaLookupExtractorFactory.this.factoryId);
                if (j == KafkaLookupExtractorFactory.this.doubleEventCount.get()) {
                    return ByteBuffer.allocate(utf8.length + 1 + 8).put(utf8).put((byte) -1).putLong(j).array();
                }
                byte[] utf82 = StringUtils.toUtf8(UUID.randomUUID().toString());
                return ByteBuffer.allocate(utf8.length + 1 + utf82.length + 1).put(utf8).put((byte) -1).put(utf82).put((byte) -1).array();
            }
        };
    }

    public long getCompletedEventCount() {
        return this.doubleEventCount.get() >> 1;
    }

    NamespaceExtractionCacheManager getCacheManager() {
        return this.cacheManager;
    }

    AtomicReference<Map<String, String>> getMapRef() {
        return this.mapRef;
    }

    AtomicLong getDoubleEventCount() {
        return this.doubleEventCount;
    }

    ListenableFuture<?> getFuture() {
        return this.future;
    }
}
