package io.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

/* loaded from: input_file:io/druid/indexing/common/task/IndexTask.class */
public class IndexTask extends AbstractFixedIntervalTask {
    private static final Logger log = new Logger(IndexTask.class);
    private static HashFunction hashFunction = Hashing.murmur3_128();

    @JsonIgnore
    private final IndexIngestionSpec ingestionSchema;
    private final ObjectMapper jsonMapper;

    @JsonTypeName("index")
    /* loaded from: input_file:io/druid/indexing/common/task/IndexTask$IndexIOConfig.class */
    public static class IndexIOConfig implements IOConfig {
        private final FirehoseFactory firehoseFactory;

        @JsonCreator
        public IndexIOConfig(@JsonProperty("firehose") FirehoseFactory firehoseFactory) {
            this.firehoseFactory = firehoseFactory;
        }

        @JsonProperty("firehose")
        public FirehoseFactory getFirehoseFactory() {
            return this.firehoseFactory;
        }
    }

    /* loaded from: input_file:io/druid/indexing/common/task/IndexTask$IndexIngestionSpec.class */
    public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig> {
        private final DataSchema dataSchema;
        private final IndexIOConfig ioConfig;
        private final IndexTuningConfig tuningConfig;

        @JsonCreator
        public IndexIngestionSpec(@JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("ioConfig") IndexIOConfig indexIOConfig, @JsonProperty("tuningConfig") IndexTuningConfig indexTuningConfig) {
            super(dataSchema, indexIOConfig, indexTuningConfig);
            this.dataSchema = dataSchema;
            this.ioConfig = indexIOConfig;
            this.tuningConfig = indexTuningConfig == null ? new IndexTuningConfig(0, 0, null, null, null) : indexTuningConfig;
        }

        @JsonProperty("dataSchema")
        public DataSchema getDataSchema() {
            return this.dataSchema;
        }

        @JsonProperty("ioConfig")
        /* renamed from: getIOConfig, reason: merged with bridge method [inline-methods] */
        public IndexIOConfig m16getIOConfig() {
            return this.ioConfig;
        }

        @JsonProperty("tuningConfig")
        /* renamed from: getTuningConfig, reason: merged with bridge method [inline-methods] */
        public IndexTuningConfig m15getTuningConfig() {
            return this.tuningConfig;
        }
    }

    @JsonTypeName("index")
    /* loaded from: input_file:io/druid/indexing/common/task/IndexTask$IndexTuningConfig.class */
    public static class IndexTuningConfig implements TuningConfig {
        private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
        private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000;
        private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
        private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
        private final int targetPartitionSize;
        private final int rowFlushBoundary;
        private final int numShards;
        private final IndexSpec indexSpec;
        private final Boolean buildV9Directly;

        @JsonCreator
        public IndexTuningConfig(@JsonProperty("targetPartitionSize") int i, @JsonProperty("rowFlushBoundary") int i2, @JsonProperty("numShards") @Nullable Integer num, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean bool) {
            this.targetPartitionSize = i == 0 ? DEFAULT_TARGET_PARTITION_SIZE : i;
            Preconditions.checkArgument(i2 >= 0, "rowFlushBoundary should be positive or zero");
            this.rowFlushBoundary = i2 == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : i2;
            this.numShards = num == null ? -1 : num.intValue();
            this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
            Preconditions.checkArgument(this.targetPartitionSize == -1 || this.numShards == -1, "targetPartitionsSize and shardCount both cannot be set");
            this.buildV9Directly = bool == null ? DEFAULT_BUILD_V9_DIRECTLY : bool;
        }

        @JsonProperty
        public int getTargetPartitionSize() {
            return this.targetPartitionSize;
        }

        @JsonProperty
        public int getRowFlushBoundary() {
            return this.rowFlushBoundary;
        }

        @JsonProperty
        public int getNumShards() {
            return this.numShards;
        }

        @JsonProperty
        public IndexSpec getIndexSpec() {
            return this.indexSpec;
        }

        @JsonProperty
        public Boolean getBuildV9Directly() {
            return this.buildV9Directly;
        }
    }

    private static boolean shouldIndex(ShardSpec shardSpec, Interval interval, InputRow inputRow, QueryGranularity queryGranularity) {
        return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(queryGranularity.truncate(inputRow.getTimestampFromEpoch()), inputRow);
    }

    private static String makeId(String str, IndexIngestionSpec indexIngestionSpec) {
        return str == null ? String.format("index_%s_%s", makeDataSource(indexIngestionSpec), new DateTime().toString()) : str;
    }

    private static String makeDataSource(IndexIngestionSpec indexIngestionSpec) {
        return indexIngestionSpec.getDataSchema().getDataSource();
    }

    private static Interval makeInterval(IndexIngestionSpec indexIngestionSpec) {
        GranularitySpec granularitySpec = indexIngestionSpec.getDataSchema().getGranularitySpec();
        return new Interval(((Interval) ((SortedSet) granularitySpec.bucketIntervals().get()).first()).getStart(), ((Interval) ((SortedSet) granularitySpec.bucketIntervals().get()).last()).getEnd());
    }

    static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int i, IndexSpec indexSpec, boolean z) {
        return new RealtimeTuningConfig(Integer.valueOf(i), (Period) null, (Period) null, (File) null, (VersioningPolicy) null, (RejectionPolicyFactory) null, (Integer) null, shardSpec, indexSpec, Boolean.valueOf(z), 0, 0, true, (Long) null);
    }

    @JsonCreator
    public IndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("spec") IndexIngestionSpec indexIngestionSpec, @JacksonInject ObjectMapper objectMapper, @JsonProperty("context") Map<String, Object> map) {
        super(makeId(str, indexIngestionSpec), taskResource, makeDataSource(indexIngestionSpec), makeInterval(indexIngestionSpec), map);
        this.ingestionSchema = indexIngestionSpec;
        this.jsonMapper = objectMapper;
    }

    @Override // io.druid.indexing.common.task.Task
    public String getType() {
        return "index";
    }

    @JsonProperty("spec")
    public IndexIngestionSpec getIngestionSchema() {
        return this.ingestionSchema;
    }

    @Override // io.druid.indexing.common.task.Task
    public TaskStatus run(TaskToolbox taskToolbox) throws Exception {
        List<ShardSpec> of;
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        int targetPartitionSize = this.ingestionSchema.m15getTuningConfig().getTargetPartitionSize();
        TaskLock taskLock = (TaskLock) Iterables.getOnlyElement(getTaskLocks(taskToolbox));
        HashSet newHashSet = Sets.newHashSet();
        Sets.SetView<Interval> intersection = Sets.intersection((Set) granularitySpec.bucketIntervals().get(), getDataIntervals());
        if (intersection.isEmpty()) {
            throw new ISE("No valid data intervals found. Check your configs!", new Object[0]);
        }
        for (Interval interval : intersection) {
            if (targetPartitionSize > 0) {
                of = determinePartitions(interval, targetPartitionSize, granularitySpec.getQueryGranularity());
            } else {
                int numShards = this.ingestionSchema.m15getTuningConfig().getNumShards();
                if (numShards > 0) {
                    of = Lists.newArrayList();
                    for (int i = 0; i < numShards; i++) {
                        of.add(new HashBasedNumberedShardSpec(i, numShards, (List) null, this.jsonMapper));
                    }
                } else {
                    of = ImmutableList.of(new NoneShardSpec());
                }
            }
            Iterator<ShardSpec> it = of.iterator();
            while (it.hasNext()) {
                newHashSet.add(generateSegment(taskToolbox, this.ingestionSchema.getDataSchema(), it.next(), interval, taskLock.getVersion()));
            }
        }
        taskToolbox.publishSegments(newHashSet);
        return TaskStatus.success(getId());
    }

    private SortedSet<Interval> getDataIntervals() throws IOException {
        FirehoseFactory firehoseFactory = this.ingestionSchema.m16getIOConfig().getFirehoseFactory();
        GranularitySpec granularitySpec = this.ingestionSchema.getDataSchema().getGranularitySpec();
        TreeSet newTreeSet = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
        int i = 0;
        Firehose connect = firehoseFactory.connect(this.ingestionSchema.getDataSchema().getParser());
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    Optional bucketInterval = granularitySpec.bucketInterval(new DateTime(connect.nextRow().getTimestampFromEpoch()));
                    if (bucketInterval.isPresent()) {
                        newTreeSet.add(bucketInterval.get());
                    } else {
                        i++;
                    }
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        if (i > 0) {
            log.warn("Unable to to find a matching interval for [%,d] events", new Object[]{Integer.valueOf(i)});
        }
        return newTreeSet;
    }

    private List<ShardSpec> determinePartitions(Interval interval, int i, QueryGranularity queryGranularity) throws IOException {
        log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", new Object[]{interval, Integer.valueOf(i)});
        FirehoseFactory firehoseFactory = this.ingestionSchema.m16getIOConfig().getFirehoseFactory();
        HyperLogLogCollector makeLatestCollector = HyperLogLogCollector.makeLatestCollector();
        Firehose connect = firehoseFactory.connect(this.ingestionSchema.getDataSchema().getParser());
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    InputRow nextRow = connect.nextRow();
                    if (interval.contains(nextRow.getTimestampFromEpoch())) {
                        makeLatestCollector.add(hashFunction.hashBytes(this.jsonMapper.writeValueAsBytes(Rows.toGroupKey(queryGranularity.truncate(nextRow.getTimestampFromEpoch()), nextRow))).asBytes());
                    }
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        double estimateCardinality = makeLatestCollector.estimateCardinality();
        log.info("Estimated approximately [%,f] rows of data.", new Object[]{Double.valueOf(estimateCardinality)});
        int ceil = (int) Math.ceil(estimateCardinality / i);
        if (ceil > estimateCardinality) {
            ceil = (int) estimateCardinality;
        }
        log.info("Will require [%,d] shard(s).", new Object[]{Integer.valueOf(ceil)});
        ArrayList newArrayList = Lists.newArrayList();
        if (ceil == 1) {
            newArrayList.add(new NoneShardSpec());
        } else {
            for (int i2 = 0; i2 < ceil; i2++) {
                newArrayList.add(new HashBasedNumberedShardSpec(i2, ceil, (List) null, this.jsonMapper));
            }
        }
        return newArrayList;
    }

    private DataSegment generateSegment(final TaskToolbox taskToolbox, DataSchema dataSchema, ShardSpec shardSpec, Interval interval, String str) throws IOException {
        File file = new File(taskToolbox.getTaskWorkDir(), String.format("%s_%s_%s_%s_%s", getDataSource(), interval.getStart(), interval.getEnd(), str, Integer.valueOf(shardSpec.getPartitionNum())));
        FirehoseFactory firehoseFactory = this.ingestionSchema.m16getIOConfig().getFirehoseFactory();
        int rowFlushBoundary = this.ingestionSchema.m15getTuningConfig().getRowFlushBoundary();
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        DataSegmentPusher dataSegmentPusher = new DataSegmentPusher() { // from class: io.druid.indexing.common.task.IndexTask.1
            public String getPathForHadoop(String str2) {
                return taskToolbox.getSegmentPusher().getPathForHadoop(str2);
            }

            public DataSegment push(File file2, DataSegment dataSegment) throws IOException {
                DataSegment push = taskToolbox.getSegmentPusher().push(file2, dataSegment);
                copyOnWriteArrayList.add(push);
                return push;
            }
        };
        int defaultRowFlushBoundary = rowFlushBoundary > 0 ? rowFlushBoundary : taskToolbox.getConfig().getDefaultRowFlushBoundary();
        FireDepartmentMetrics fireDepartmentMetrics = new FireDepartmentMetrics();
        Firehose connect = firehoseFactory.connect(this.ingestionSchema.getDataSchema().getParser());
        Supplier supplierFromFirehose = Committers.supplierFromFirehose(connect);
        Plumber findPlumber = new YeOldePlumberSchool(interval, str, dataSegmentPusher, file, taskToolbox.getIndexMerger(), taskToolbox.getIndexMergerV9(), taskToolbox.getIndexIO()).findPlumber(dataSchema, convertTuningConfig(shardSpec, defaultRowFlushBoundary, this.ingestionSchema.m15getTuningConfig().getIndexSpec(), this.ingestionSchema.tuningConfig.getBuildV9Directly().booleanValue()), fireDepartmentMetrics);
        QueryGranularity queryGranularity = this.ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity();
        try {
            findPlumber.startJob();
            while (connect.hasMore()) {
                InputRow nextRow = connect.nextRow();
                if (!shouldIndex(shardSpec, interval, nextRow, queryGranularity)) {
                    fireDepartmentMetrics.incrementThrownAway();
                } else {
                    if (findPlumber.add(nextRow, supplierFromFirehose) == -1) {
                        throw new ISE(String.format("Was expecting non-null sink for timestamp[%s]", new DateTime(nextRow.getTimestampFromEpoch())), new Object[0]);
                    }
                    fireDepartmentMetrics.incrementProcessed();
                }
            }
            findPlumber.persist((Committer) supplierFromFirehose.get());
            try {
                findPlumber.finishJob();
                log.info("Task[%s] interval[%s] partition[%d] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows", new Object[]{getId(), interval, Integer.valueOf(shardSpec.getPartitionNum()), Long.valueOf(fireDepartmentMetrics.processed() + fireDepartmentMetrics.unparseable() + fireDepartmentMetrics.thrownAway()), Long.valueOf(fireDepartmentMetrics.processed()), Long.valueOf(fireDepartmentMetrics.unparseable()), Long.valueOf(fireDepartmentMetrics.thrownAway()), Long.valueOf(fireDepartmentMetrics.rowOutput())});
                return (DataSegment) Iterables.getOnlyElement(copyOnWriteArrayList);
            } catch (Throwable th) {
                log.info("Task[%s] interval[%s] partition[%d] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows", new Object[]{getId(), interval, Integer.valueOf(shardSpec.getPartitionNum()), Long.valueOf(fireDepartmentMetrics.processed() + fireDepartmentMetrics.unparseable() + fireDepartmentMetrics.thrownAway()), Long.valueOf(fireDepartmentMetrics.processed()), Long.valueOf(fireDepartmentMetrics.unparseable()), Long.valueOf(fireDepartmentMetrics.thrownAway()), Long.valueOf(fireDepartmentMetrics.rowOutput())});
                throw th;
            }
        } finally {
            connect.close();
        }
    }
}
