package io.druid.tests.indexer;

import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import io.druid.testing.utils.TestQueryHelper;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.IOUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Guice(moduleFactory = DruidTestModuleFactory.class)
/* loaded from: input_file:io/druid/tests/indexer/ITKafkaTest.class */
public class ITKafkaTest extends AbstractIndexerTest {
    private static final Logger LOG = new Logger(ITKafkaTest.class);
    private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
    private static final String INDEXER_FILE = "/indexer/kafka_index_task.json";
    private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
    private static final String DATASOURCE = "kafka_test";
    private static final String TOPIC_NAME = "kafkaTopic";
    private static final int MINUTES_TO_SEND = 2;
    private String taskID;
    private ZkClient zkClient;
    private Boolean segmentsExist;
    private DateTime dtFirst;
    private DateTime dtLast;

    @Inject
    private TestQueryHelper queryHelper;

    @Inject
    private IntegrationTestingConfig config;
    final String event_template = "{\"timestamp\": \"%s\",\"page\": \"Gypsy Danger\",\"language\" : \"en\",\"user\" : \"nuclear\",\"unpatrolled\" : \"true\",\"newPage\" : \"true\",\"robot\": \"false\",\"anonymous\": \"false\",\"namespace\":\"article\",\"continent\":\"North America\",\"country\":\"United States\",\"region\":\"Bay Area\",\"city\":\"San Francisco\",\"added\":%d,\"deleted\":%d,\"delta\":%d}";
    private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
    private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");

    @Test
    public void testKafka() {
        LOG.info("Starting test: ITKafkaTest", new Object[0]);
        try {
            this.zkClient = new ZkClient(this.config.getZookeeperHosts(), 10000, 10000, ZKStringSerializer$.MODULE$);
            AdminUtils.createTopic(this.zkClient, TOPIC_NAME, 1, 1, new Properties());
        } catch (Exception e) {
            throw new ISE(e, "could not create kafka topic", new Object[0]);
        } catch (TopicExistsException e2) {
        }
        try {
            LOG.info("indexerFile name: [%s]", new Object[]{INDEXER_FILE});
            String replaceAll = getTaskAsString(INDEXER_FILE).replaceAll("%%TOPIC%%", TOPIC_NAME).replaceAll("%%ZOOKEEPER_SERVER%%", this.config.getZookeeperHosts()).replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis())).replaceAll("%%SHUTOFFTIME%%", new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(4L)).toString());
            LOG.info("indexerFile: [%s]\n", new Object[]{replaceAll});
            this.taskID = this.indexer.submitTask(replaceAll);
            LOG.info("-------------SUBMITTED TASK", new Object[0]);
            Properties properties = new Properties();
            properties.put("metadata.broker.list", this.config.getKafkaHost());
            LOG.info("kafka host: [%s]", new Object[]{this.config.getKafkaHost()});
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("request.required.acks", "1");
            properties.put("producer.type", "async");
            Producer producer = new Producer(new ProducerConfig(properties));
            DateTimeZone forID = DateTimeZone.forID("UTC");
            DateTimeFormatter forPattern = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
            DateTime dateTime = new DateTime(forID);
            this.dtFirst = dateTime;
            this.dtLast = dateTime;
            DateTime plusSeconds = this.dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);
            int i = 0;
            int i2 = 0;
            while (dateTime.compareTo(plusSeconds) < 0) {
                i2++;
                i += i2;
                String format = String.format("{\"timestamp\": \"%s\",\"page\": \"Gypsy Danger\",\"language\" : \"en\",\"user\" : \"nuclear\",\"unpatrolled\" : \"true\",\"newPage\" : \"true\",\"robot\": \"false\",\"anonymous\": \"false\",\"namespace\":\"article\",\"continent\":\"North America\",\"country\":\"United States\",\"region\":\"Bay Area\",\"city\":\"San Francisco\",\"added\":%d,\"deleted\":%d,\"delta\":%d}", forPattern.print(dateTime), Integer.valueOf(i2), 0, Integer.valueOf(i2));
                LOG.info("sending event: [%s]", new Object[]{format});
                try {
                    producer.send(new KeyedMessage(TOPIC_NAME, format));
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e3) {
                    }
                    this.dtLast = dateTime;
                    dateTime = new DateTime(forID);
                } catch (Exception e4) {
                    throw Throwables.propagate(e4);
                }
            }
            producer.close();
            InputStream resourceAsStream = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE);
            if (null == resourceAsStream) {
                throw new ISE("could not open query file: %s", new Object[]{QUERIES_FILE});
            }
            try {
                String replace = IOUtils.toString(resourceAsStream, "UTF-8").replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", this.TIMESTAMP_FMT.print(this.dtFirst)).replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", this.TIMESTAMP_FMT.print(this.dtLast)).replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", this.TIMESTAMP_FMT.print(this.dtFirst)).replace("%%TIMESERIES_QUERY_START%%", this.INTERVAL_FMT.print(this.dtFirst)).replace("%%TIMESERIES_QUERY_END%%", this.INTERVAL_FMT.print(this.dtFirst.plusMinutes(4))).replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", this.TIMESTAMP_FMT.print(this.dtFirst)).replace("%%TIMESERIES_ADDED%%", Integer.toString(i)).replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(i2));
                try {
                    this.queryHelper.testQueriesFromString(replace, MINUTES_TO_SEND);
                    try {
                        RetryUtil.retryUntil(new Callable<Boolean>() { // from class: io.druid.tests.indexer.ITKafkaTest.1
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            public Boolean call() throws Exception {
                                return Boolean.valueOf(ITKafkaTest.this.coordinator.areSegmentsLoaded(ITKafkaTest.DATASOURCE));
                            }
                        }, true, 30000L, 10, "Real-time generated segments loaded");
                        LOG.info("segments are present", new Object[0]);
                        this.segmentsExist = true;
                        try {
                            this.queryHelper.testQueriesFromString(replace, MINUTES_TO_SEND);
                        } catch (Exception e5) {
                            throw Throwables.propagate(e5);
                        }
                    } catch (Exception e6) {
                        throw Throwables.propagate(e6);
                    }
                } catch (Exception e7) {
                    throw Throwables.propagate(e7);
                }
            } catch (IOException e8) {
                throw new ISE(e8, "could not read query file: %s", new Object[]{QUERIES_FILE});
            }
        } catch (Exception e9) {
            LOG.error("could not read indexer file [%s]", new Object[]{INDEXER_FILE});
            throw new ISE(e9, "could not read indexer file [%s]", new Object[]{INDEXER_FILE});
        }
    }

    @AfterClass
    public void afterClass() {
        LOG.info("teardown", new Object[0]);
        this.indexer.waitUntilTaskCompletes(this.taskID);
        AdminUtils.deleteTopic(this.zkClient, TOPIC_NAME);
        if (this.segmentsExist.booleanValue()) {
            try {
                unloadAndKillData(DATASOURCE);
            } catch (Exception e) {
                LOG.warn("exception while removing segments: [%s]", new Object[]{e.getMessage()});
            }
        }
    }
}
