package io.druid.indexing.common.index;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.Segment;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;

@JsonTypeName("historical")
/* loaded from: input_file:io/druid/indexing/common/index/YeOldePlumberSchool.class */
public class YeOldePlumberSchool implements PlumberSchool {
    private final Interval interval;
    private final String version;
    private final DataSegmentPusher dataSegmentPusher;
    private final File tmpSegmentDir;
    private static final Logger log = new Logger(YeOldePlumberSchool.class);

    @JsonCreator
    public YeOldePlumberSchool(@JsonProperty("interval") Interval interval, @JsonProperty("version") String str, @JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher, @JacksonInject("tmpSegmentDir") File file) {
        this.interval = interval;
        this.version = str;
        this.dataSegmentPusher = dataSegmentPusher;
        this.tmpSegmentDir = file;
    }

    public Plumber findPlumber(final DataSchema dataSchema, final RealtimeTuningConfig realtimeTuningConfig, final FireDepartmentMetrics fireDepartmentMetrics) {
        final Sink sink = new Sink(this.interval, dataSchema, realtimeTuningConfig, this.version);
        final File file = new File(this.tmpSegmentDir, sink.getSegment().getIdentifier());
        final HashSet newHashSet = Sets.newHashSet();
        return new Plumber() { // from class: io.druid.indexing.common.index.YeOldePlumberSchool.1
            public Object startJob() {
                return null;
            }

            public int add(InputRow inputRow, Supplier<Committer> supplier) throws IndexSizeExceededException {
                Sink sink2 = getSink(inputRow.getTimestampFromEpoch());
                if (sink2 == null) {
                    return -1;
                }
                int add = sink2.add(inputRow);
                if (!sink2.canAppendRow()) {
                    persist((Committer) supplier.get());
                }
                return add;
            }

            private Sink getSink(long j) {
                if (sink.getInterval().contains(j)) {
                    return sink;
                }
                return null;
            }

            public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
                throw new UnsupportedOperationException("Don't query me, bro.");
            }

            public void persist(Committer committer) {
                spillIfSwappable();
                committer.run();
            }

            public void finishJob() {
                File file2;
                try {
                    try {
                        Preconditions.checkState(!sink.swappable(), "All data must be persisted before fininshing the job!");
                        if (newHashSet.size() == 0) {
                            throw new IllegalStateException("Nothing indexed?");
                        }
                        if (newHashSet.size() == 1) {
                            file2 = (File) Iterables.getOnlyElement(newHashSet);
                        } else {
                            ArrayList newArrayList = Lists.newArrayList();
                            Iterator it = newHashSet.iterator();
                            while (it.hasNext()) {
                                newArrayList.add(IndexIO.loadIndex((File) it.next()));
                            }
                            file2 = new File(YeOldePlumberSchool.this.tmpSegmentDir, "merged");
                            IndexMerger.mergeQueryableIndex(newArrayList, dataSchema.getAggregators(), file2, realtimeTuningConfig.getIndexSpec());
                        }
                        DataSegment withBinaryVersion = sink.getSegment().withDimensions(ImmutableList.copyOf(IndexIO.loadIndex(file2).getAvailableDimensions())).withBinaryVersion(SegmentUtils.getVersionFromDir(file2));
                        YeOldePlumberSchool.this.dataSegmentPusher.push(file2, withBinaryVersion);
                        YeOldePlumberSchool.log.info("Uploaded segment[%s]", new Object[]{withBinaryVersion.getIdentifier()});
                        if (file2 != null) {
                            try {
                                YeOldePlumberSchool.log.info("Deleting Index File[%s]", new Object[]{file2});
                                FileUtils.deleteDirectory(file2);
                            } catch (IOException e) {
                                YeOldePlumberSchool.log.warn(e, "Error deleting directory[%s]", new Object[]{file2});
                            }
                        }
                    } catch (Exception e2) {
                        YeOldePlumberSchool.log.warn(e2, "Failed to merge and upload", new Object[0]);
                        throw Throwables.propagate(e2);
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            YeOldePlumberSchool.log.info("Deleting Index File[%s]", new Object[]{null});
                            FileUtils.deleteDirectory((File) null);
                        } catch (IOException e3) {
                            YeOldePlumberSchool.log.warn(e3, "Error deleting directory[%s]", new Object[]{null});
                            throw th;
                        }
                    }
                    throw th;
                }
            }

            private void spillIfSwappable() {
                if (sink.swappable()) {
                    FireHydrant swap = sink.swap();
                    int size = swap.getIndex().size();
                    File spillDir = getSpillDir(swap.getCount());
                    YeOldePlumberSchool.log.info("Spilling index[%d] with rows[%d] to: %s", new Object[]{Integer.valueOf(swap.getCount()), Integer.valueOf(size), spillDir});
                    try {
                        IndexMerger.persist(swap.getIndex(), spillDir, (Map) null, realtimeTuningConfig.getIndexSpec());
                        swap.swapSegment((Segment) null);
                        fireDepartmentMetrics.incrementRowOutputCount(size);
                        newHashSet.add(spillDir);
                    } catch (Exception e) {
                        YeOldePlumberSchool.log.warn(e, "Failed to spill index[%d]", new Object[]{Integer.valueOf(swap.getCount())});
                        throw Throwables.propagate(e);
                    }
                }
            }

            private File getSpillDir(int i) {
                return new File(file, String.format("spill%d", Integer.valueOf(i)));
            }
        };
    }
}
