package io.druid.indexing.firehose;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.guice.GuiceAnnotationIntrospector;
import io.druid.guice.GuiceInjectableValues;
import io.druid.guice.GuiceInjectors;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalLoadSpec;
import io.druid.segment.loading.QueryableIndexFactory;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.class */
public class IngestSegmentFirehoseFactoryTest {
    private static final ObjectMapper MAPPER;
    private static final IndexMerger INDEX_MERGER;
    private static final IndexMergerV9 INDEX_MERGER_V9;
    private static final IndexIO INDEX_IO;
    private static final Logger log;
    private static final Interval FOREVER;
    private static final String DATA_SOURCE_NAME = "testDataSource";
    private static final String DATA_SOURCE_VERSION = "version";
    private static final Integer BINARY_VERSION;
    private static final String DIM_NAME = "testDimName";
    private static final String DIM_VALUE = "testDimValue";
    private static final String DIM_LONG_NAME = "testDimLongName";
    private static final String DIM_FLOAT_NAME = "testDimFloatName";
    private static final String METRIC_LONG_NAME = "testLongMetric";
    private static final String METRIC_FLOAT_NAME = "testFloatMetric";
    private static final Long METRIC_LONG_VALUE;
    private static final Float METRIC_FLOAT_VALUE;
    private static final String TIME_COLUMN = "ts";
    private static final Integer MAX_SHARD_NUMBER;
    private static final Integer MAX_ROWS;
    private static final File tmpDir;
    private static final File persistDir;
    private static final List<DataSegment> segmentSet;
    private final IngestSegmentFirehoseFactory factory;
    private final InputRowParser rowParser;
    private static final InputRowParser<Map<String, Object>> ROW_PARSER;

    @Parameterized.Parameters(name = "{1}")
    public static Collection<Object[]> constructorFeeder() throws IOException {
        IndexSpec indexSpec = new IndexSpec();
        HeapMemoryTaskStorage heapMemoryTaskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null) { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.1
        });
        OnheapIncrementalIndex onheapIncrementalIndex = new OnheapIncrementalIndex(new IncrementalIndexSchema.Builder().withQueryGranularity(QueryGranularities.NONE).withMinTimestamp(-4611686018427387904L).withDimensionsSpec(ROW_PARSER).withMetrics(new AggregatorFactory[]{new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME), new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)}).build(), true, MAX_ROWS.intValue() * MAX_SHARD_NUMBER.intValue());
        int i = 0;
        while (true) {
            Integer num = i;
            if (num.intValue() >= MAX_ROWS.intValue()) {
                break;
            }
            onheapIncrementalIndex.add(ROW_PARSER.parse(buildRow(Long.valueOf(num.longValue()))));
            i = Integer.valueOf(num.intValue() + 1);
        }
        if (!persistDir.mkdirs() && !persistDir.exists()) {
            throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath()));
        }
        INDEX_MERGER.persist(onheapIncrementalIndex, persistDir, indexSpec);
        LocalTaskActionClientFactory localTaskActionClientFactory = new LocalTaskActionClientFactory(heapMemoryTaskStorage, new TaskActionToolbox(new TaskLockbox(heapMemoryTaskStorage), new IndexerSQLMetadataStorageCoordinator(null, null, null) { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.2
            private final Set<DataSegment> published = Sets.newHashSet();
            private final Set<DataSegment> nuked = Sets.newHashSet();

            public List<DataSegment> getUsedSegmentsForInterval(String str, Interval interval) throws IOException {
                return ImmutableList.copyOf(IngestSegmentFirehoseFactoryTest.segmentSet);
            }

            public List<DataSegment> getUsedSegmentsForIntervals(String str, List<Interval> list) throws IOException {
                return ImmutableList.copyOf(IngestSegmentFirehoseFactoryTest.segmentSet);
            }

            public List<DataSegment> getUnusedSegmentsForInterval(String str, Interval interval) {
                return ImmutableList.of();
            }

            public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> set) {
                HashSet newHashSet = Sets.newHashSet();
                for (DataSegment dataSegment : set) {
                    if (this.published.add(dataSegment)) {
                        newHashSet.add(dataSegment);
                    }
                }
                return ImmutableSet.copyOf(newHashSet);
            }

            public void deleteSegments(Set<DataSegment> set) {
                this.nuked.addAll(set);
            }
        }, newMockEmitter()));
        SegmentHandoffNotifierFactory segmentHandoffNotifierFactory = (SegmentHandoffNotifierFactory) EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
        EasyMock.replay(new Object[]{segmentHandoffNotifierFactory});
        final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(new TaskConfig(tmpDir.getAbsolutePath(), (String) null, (String) null, 50000, (List) null, false, (Period) null, (Period) null), localTaskActionClientFactory, newMockEmitter(), new DataSegmentPusher() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.3
            public String getPathForHadoop(String str) {
                throw new UnsupportedOperationException();
            }

            public DataSegment push(File file, DataSegment dataSegment) throws IOException {
                return dataSegment;
            }
        }, new DataSegmentKiller() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.4
            public void kill(DataSegment dataSegment) throws SegmentLoadingException {
            }
        }, new DataSegmentMover() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.5
            public DataSegment move(DataSegment dataSegment, Map<String, Object> map) throws SegmentLoadingException {
                return dataSegment;
            }
        }, new DataSegmentArchiver() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.6
            public DataSegment archive(DataSegment dataSegment) throws SegmentLoadingException {
                return dataSegment;
            }

            public DataSegment restore(DataSegment dataSegment) throws SegmentLoadingException {
                return dataSegment;
            }
        }, (DataSegmentAnnouncer) null, segmentHandoffNotifierFactory, (QueryRunnerFactoryConglomerate) null, (ExecutorService) null, (MonitorScheduler) null, new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager((QueryableIndexFactory) null, new SegmentLoaderConfig() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.7
            public List<StorageLocationConfig> getLocations() {
                return Lists.newArrayList();
            }
        }, MAPPER)), MAPPER, INDEX_MERGER, INDEX_IO, (Cache) null, (CacheConfig) null, INDEX_MERGER_V9);
        LinkedList linkedList = new LinkedList();
        for (InputRowParser<Map<String, Object>> inputRowParser : Arrays.asList(ROW_PARSER, new MapInputRowParser(new JSONParseSpec(new TimestampSpec(TIME_COLUMN, "auto", (DateTime) null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of()), ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), ImmutableList.of()))))) {
            for (List list : Arrays.asList(null, ImmutableList.of(DIM_NAME))) {
                for (List list2 : Arrays.asList(null, ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME))) {
                    Object[] objArr = new Object[3];
                    objArr[0] = new IngestSegmentFirehoseFactory(DATA_SOURCE_NAME, FOREVER, new SelectorDimFilter(DIM_NAME, DIM_VALUE, (ExtractionFn) null), list, list2, Guice.createInjector(new Module[]{new Module() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.8
                        public void configure(Binder binder) {
                            binder.bind(TaskToolboxFactory.class).toInstance(taskToolboxFactory);
                        }
                    }}), INDEX_IO);
                    Object[] objArr2 = new Object[3];
                    objArr2[0] = list == null ? "null" : "dims";
                    objArr2[1] = list2 == null ? "null" : "metrics";
                    objArr2[2] = inputRowParser == ROW_PARSER ? "dims" : "null";
                    objArr[1] = String.format("DimNames[%s]MetricNames[%s]ParserDimNames[%s]", objArr2);
                    objArr[2] = inputRowParser;
                    linkedList.add(objArr);
                }
            }
        }
        return linkedList;
    }

    public static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) {
        objectMapper.registerModule(new SimpleModule("testModule").registerSubtypes(new Class[]{LocalLoadSpec.class}));
        GuiceAnnotationIntrospector guiceAnnotationIntrospector = new GuiceAnnotationIntrospector();
        objectMapper.setAnnotationIntrospectors(new AnnotationIntrospectorPair(guiceAnnotationIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()), new AnnotationIntrospectorPair(guiceAnnotationIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()));
        objectMapper.setInjectableValues(new GuiceInjectableValues(GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new Module() { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.9
            public void configure(Binder binder) {
                binder.bind(LocalDataSegmentPuller.class);
            }
        }))));
        return objectMapper;
    }

    public IngestSegmentFirehoseFactoryTest(IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory, String str, InputRowParser inputRowParser) {
        this.factory = ingestSegmentFirehoseFactory;
        this.rowParser = inputRowParser;
    }

    private static Map<String, Object> buildRow(Long l) {
        return ImmutableMap.of(TIME_COLUMN, l, DIM_NAME, DIM_VALUE, DIM_FLOAT_NAME, METRIC_FLOAT_VALUE, DIM_LONG_NAME, METRIC_LONG_VALUE);
    }

    private static DataSegment buildSegment(Integer num) {
        Preconditions.checkArgument(num.intValue() < MAX_SHARD_NUMBER.intValue());
        Preconditions.checkArgument(num.intValue() >= 0);
        return new DataSegment(DATA_SOURCE_NAME, FOREVER, DATA_SOURCE_VERSION, ImmutableMap.of("type", "local", "path", persistDir.getAbsolutePath()), ImmutableList.of(DIM_NAME), ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), new NumberedShardSpec(num.intValue(), MAX_SHARD_NUMBER.intValue()), BINARY_VERSION, 0L);
    }

    @BeforeClass
    public static void setUpStatic() throws IOException {
        for (int i = 0; i < MAX_SHARD_NUMBER.intValue(); i++) {
            segmentSet.add(buildSegment(Integer.valueOf(i)));
        }
    }

    @AfterClass
    public static void tearDownStatic() {
        recursivelyDelete(tmpDir);
    }

    private static void recursivelyDelete(File file) {
        if (file != null) {
            if (!file.isDirectory()) {
                if (file.delete()) {
                    return;
                }
                log.warn("Could not delete file at [%s]", new Object[]{file.getAbsolutePath()});
                return;
            }
            File[] listFiles = file.listFiles();
            if (listFiles != null) {
                for (File file2 : listFiles) {
                    recursivelyDelete(file2);
                }
            }
        }
    }

    @Test
    public void sanityTest() {
        Assert.assertEquals(DATA_SOURCE_NAME, this.factory.getDataSource());
        if (this.factory.getDimensions() != null) {
            Assert.assertArrayEquals(new String[]{DIM_NAME}, this.factory.getDimensions().toArray());
        }
        Assert.assertEquals(FOREVER, this.factory.getInterval());
        if (this.factory.getMetrics() != null) {
            Assert.assertEquals(ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), ImmutableSet.copyOf(this.factory.getMetrics()));
        }
    }

    @Test
    public void simpleFirehoseReadingTest() throws IOException {
        Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
        Integer num = 0;
        IngestSegmentFirehose connect = this.factory.connect(this.rowParser);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    InputRow nextRow = connect.nextRow();
                    Assert.assertArrayEquals(new String[]{DIM_NAME}, nextRow.getDimensions().toArray());
                    Assert.assertArrayEquals(new String[]{DIM_VALUE}, nextRow.getDimension(DIM_NAME).toArray());
                    Assert.assertEquals(METRIC_LONG_VALUE.longValue(), nextRow.getLongMetric(METRIC_LONG_NAME));
                    Assert.assertEquals(METRIC_FLOAT_VALUE.floatValue(), nextRow.getFloatMetric(METRIC_FLOAT_NAME), METRIC_FLOAT_VALUE.floatValue() * 1.0E-4d);
                    num = Integer.valueOf(num.intValue() + 1);
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        Assert.assertEquals(MAX_SHARD_NUMBER.intValue() * MAX_ROWS.intValue(), num.intValue());
    }

    private static ServiceEmitter newMockEmitter() {
        return new ServiceEmitter(null, null, null) { // from class: io.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.10
            public void emit(Event event) {
            }

            public void emit(ServiceEventBuilder serviceEventBuilder) {
            }
        };
    }

    static {
        TestUtils testUtils = new TestUtils();
        MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
        INDEX_MERGER = testUtils.getTestIndexMerger();
        INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
        INDEX_IO = testUtils.getTestIndexIO();
        log = new Logger(IngestSegmentFirehoseFactoryTest.class);
        FOREVER = new Interval(-4611686018427387904L, 4611686018427387903L);
        BINARY_VERSION = -1;
        METRIC_LONG_VALUE = 1L;
        METRIC_FLOAT_VALUE = Float.valueOf(1.0f);
        MAX_SHARD_NUMBER = 10;
        MAX_ROWS = 10;
        tmpDir = Files.createTempDir();
        persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile();
        segmentSet = new ArrayList(MAX_SHARD_NUMBER.intValue());
        ROW_PARSER = new MapInputRowParser(new JSONParseSpec(new TimestampSpec(TIME_COLUMN, "auto", (DateTime) null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)), ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), ImmutableList.of())));
    }
}
