package io.druid.indexing.kafka;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.guava.Sequence;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;

/* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTask.class */
public class KafkaIndexTask extends AbstractTask implements ChatHandler {
    public static final long PAUSE_FOREVER = -1;
    private static final String TYPE = "index_kafka";
    private static final long POLL_TIMEOUT = 100;
    private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
    private final DataSchema dataSchema;
    private final InputRowParser<ByteBuffer> parser;
    private final KafkaTuningConfig tuningConfig;
    private final KafkaIOConfig ioConfig;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;
    private final Map<Integer, Long> endOffsets;
    private final Map<Integer, Long> nextOffsets;
    private ObjectMapper mapper;
    private volatile Appenderator appenderator;
    private volatile FireDepartmentMetrics fireDepartmentMetrics;
    private volatile DateTime startTime;
    private volatile Status status;
    private volatile Thread runThread;
    private volatile boolean stopRequested;
    private volatile boolean publishOnStop;
    private final Lock pauseLock;
    private final Condition hasPaused;
    private final Condition shouldResume;
    private volatile boolean pauseRequested;
    private volatile long pauseMillis;
    private static final Logger log = new Logger(KafkaIndexTask.class);
    private static final Random RANDOM = new Random();

    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTask$Status.class */
    public enum Status {
        NOT_STARTED,
        STARTING,
        READING,
        PAUSED,
        PUBLISHING
    }

    @JsonCreator
    public KafkaIndexTask(@JsonProperty("id") String str, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KafkaTuningConfig kafkaTuningConfig, @JsonProperty("ioConfig") KafkaIOConfig kafkaIOConfig, @JsonProperty("context") Map<String, Object> map, @JacksonInject ChatHandlerProvider chatHandlerProvider) {
        super(str == null ? makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : str, String.format("%s_%s", TYPE, dataSchema.getDataSource()), taskResource, dataSchema.getDataSource(), map);
        this.endOffsets = new ConcurrentHashMap();
        this.nextOffsets = new ConcurrentHashMap();
        this.appenderator = null;
        this.fireDepartmentMetrics = null;
        this.status = Status.NOT_STARTED;
        this.runThread = null;
        this.stopRequested = false;
        this.publishOnStop = false;
        this.pauseLock = new ReentrantLock();
        this.hasPaused = this.pauseLock.newCondition();
        this.shouldResume = this.pauseLock.newCondition();
        this.pauseRequested = false;
        this.pauseMillis = 0L;
        this.dataSchema = (DataSchema) Preconditions.checkNotNull(dataSchema, "dataSchema");
        this.parser = (InputRowParser) Preconditions.checkNotNull(dataSchema.getParser(), "parser");
        this.tuningConfig = (KafkaTuningConfig) Preconditions.checkNotNull(kafkaTuningConfig, "tuningConfig");
        this.ioConfig = (KafkaIOConfig) Preconditions.checkNotNull(kafkaIOConfig, "ioConfig");
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
        this.endOffsets.putAll(kafkaIOConfig.getEndPartitions().getPartitionOffsetMap());
    }

    private static String makeTaskId(String str, int i) {
        StringBuilder sb = new StringBuilder(8);
        for (int i2 = 0; i2 < 8; i2++) {
            sb.append((char) (97 + ((i >>> (i2 * 4)) & 15)));
        }
        return Joiner.on("_").join(TYPE, str, new Object[]{sb});
    }

    public String getType() {
        return TYPE;
    }

    public boolean isReady(TaskActionClient taskActionClient) throws Exception {
        return true;
    }

    @JsonProperty
    public DataSchema getDataSchema() {
        return this.dataSchema;
    }

    @JsonProperty
    public KafkaTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @JsonProperty("ioConfig")
    public KafkaIOConfig getIOConfig() {
        return this.ioConfig;
    }

    /* JADX WARN: Failed to calculate best type for var: r14v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r14v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r15v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 14, insn: 0x064b: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:184:0x064b */
    /* JADX WARN: Not initialized variable reg: 15, insn: 0x064f: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r15 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:186:0x064f */
    /* JADX WARN: Not initialized variable reg: 16, insn: 0x05f3: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:171:0x05f3 */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x05f8: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:173:0x05f8 */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x059c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:155:0x059c */
    /* JADX WARN: Not initialized variable reg: 19, insn: 0x05a1: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r19 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:157:0x05a1 */
    /* JADX WARN: Type inference failed for: r14v1, types: [io.druid.segment.realtime.appenderator.Appenderator] */
    /* JADX WARN: Type inference failed for: r15v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r16v0, types: [io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver] */
    /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r18v0, types: [org.apache.kafka.clients.consumer.KafkaConsumer] */
    /* JADX WARN: Type inference failed for: r19v0, types: [java.lang.Throwable] */
    public TaskStatus run(final TaskToolbox taskToolbox) throws Exception {
        ?? r14;
        ?? r15;
        Appenderator newAppenderator;
        Throwable th;
        ?? r16;
        ?? r17;
        FiniteAppenderatorDriver newDriver;
        Throwable th2;
        ?? r18;
        ?? r19;
        log.info("Starting up!", new Object[0]);
        this.startTime = DateTime.now();
        this.mapper = taskToolbox.getObjectMapper();
        this.status = Status.STARTING;
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chat handler of class[%s]", new Object[]{((ChatHandlerProvider) this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(getId(), this);
        } else {
            log.warn("No chat handler detected", new Object[0]);
        }
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartment = new FireDepartment(this.dataSchema, new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null, (FirehoseFactoryV2) null), (RealtimeTuningConfig) null);
        this.fireDepartmentMetrics = fireDepartment.getMetrics();
        taskToolbox.getMonitorScheduler().addMonitor(new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment), ImmutableMap.of("taskId", new String[]{getId()})));
        try {
            try {
                newAppenderator = newAppenderator(this.fireDepartmentMetrics, taskToolbox);
                th = null;
                try {
                    newDriver = newDriver(newAppenderator, taskToolbox);
                    th2 = null;
                } catch (Throwable th3) {
                    if (r16 != 0) {
                        if (r17 != 0) {
                            try {
                                r16.close();
                            } catch (Throwable th4) {
                                r17.addSuppressed(th4);
                            }
                        } else {
                            r16.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                if (r14 != 0) {
                    if (r15 != 0) {
                        try {
                            r14.close();
                        } catch (Throwable th6) {
                            r15.addSuppressed(th6);
                        }
                    } else {
                        r14.close();
                    }
                }
                throw th5;
            }
        } catch (InterruptedException e) {
            if (!this.stopRequested) {
                Thread.currentThread().interrupt();
                throw e;
            }
            log.info("The task was asked to stop before completing", new Object[0]);
        }
        try {
            final KafkaConsumer<byte[], byte[]> newConsumer = newConsumer();
            Throwable th7 = null;
            this.appenderator = newAppenderator;
            String topic = this.ioConfig.getStartPartitions().getTopic();
            Object startJob = newDriver.startJob();
            if (startJob == null) {
                this.nextOffsets.putAll(this.ioConfig.getStartPartitions().getPartitionOffsetMap());
            } else {
                KafkaPartitions kafkaPartitions = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) startJob).get(METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                this.nextOffsets.putAll(kafkaPartitions.getPartitionOffsetMap());
                if (!kafkaPartitions.getTopic().equals(this.ioConfig.getStartPartitions().getTopic())) {
                    throw new ISE("WTF?! Restored topic[%s] but expected topic[%s]", new Object[]{kafkaPartitions.getTopic(), this.ioConfig.getStartPartitions().getTopic()});
                }
                if (!this.nextOffsets.keySet().equals(this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
                    throw new ISE("WTF?! Restored partitions[%s] but expected partitions[%s]", new Object[]{this.nextOffsets.keySet(), this.ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()});
                }
            }
            HashMap newHashMap = Maps.newHashMap();
            for (Integer num : this.nextOffsets.keySet()) {
                newHashMap.put(num, String.format("%s_%s", this.ioConfig.getBaseSequenceName(), num));
            }
            Supplier<Committer> supplier = new Supplier<Committer>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Committer m1get() {
                    final ImmutableMap copyOf = ImmutableMap.copyOf(KafkaIndexTask.this.nextOffsets);
                    return new Committer() { // from class: io.druid.indexing.kafka.KafkaIndexTask.1.1
                        public Object getMetadata() {
                            return ImmutableMap.of(KafkaIndexTask.METADATA_NEXT_PARTITIONS, new KafkaPartitions(KafkaIndexTask.this.ioConfig.getStartPartitions().getTopic(), copyOf));
                        }

                        public void run() {
                        }
                    };
                }
            };
            Set<Integer> assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
            boolean z = !assignPartitionsAndSeekToNext.isEmpty();
            loop1: while (z) {
                try {
                    if (possiblyPause(assignPartitionsAndSeekToNext)) {
                        assignPartitionsAndSeekToNext = assignPartitionsAndSeekToNext(newConsumer, topic);
                        if (assignPartitionsAndSeekToNext.isEmpty()) {
                            log.info("All partitions have been fully read", new Object[0]);
                            this.publishOnStop = true;
                            this.stopRequested = true;
                        }
                    }
                    if (this.stopRequested) {
                        break;
                    }
                    Iterator it = ((ConsumerRecords) RetryUtils.retry(new Callable<ConsumerRecords<byte[], byte[]>>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.2
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public ConsumerRecords<byte[], byte[]> call() throws Exception {
                            try {
                                return newConsumer.poll(KafkaIndexTask.POLL_TIMEOUT);
                            } finally {
                                KafkaIndexTask.this.status = Status.READING;
                            }
                        }
                    }, new Predicate<Throwable>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.3
                        public boolean apply(Throwable th8) {
                            return th8 instanceof OffsetOutOfRangeException;
                        }
                    }, Integer.MAX_VALUE)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (log.isTraceEnabled()) {
                            log.trace("Got topic[%s] partition[%d] offset[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                        }
                        if (consumerRecord.offset() < this.endOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                            if (consumerRecord.offset() != this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).longValue()) {
                                throw new ISE("WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", new Object[]{Long.valueOf(consumerRecord.offset()), this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())), Integer.valueOf(consumerRecord.partition())});
                            }
                            try {
                                InputRow inputRow = (InputRow) Preconditions.checkNotNull(this.parser.parse(ByteBuffer.wrap((byte[]) consumerRecord.value())), "row");
                                if (this.ioConfig.getMinimumMessageTime().isPresent() && ((DateTime) this.ioConfig.getMinimumMessageTime().get()).isAfter(inputRow.getTimestamp())) {
                                    this.fireDepartmentMetrics.incrementThrownAway();
                                } else {
                                    if (newDriver.add(inputRow, (String) newHashMap.get(Integer.valueOf(consumerRecord.partition())), supplier) == null) {
                                        throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{inputRow.getTimestamp()});
                                        break loop1;
                                    }
                                    this.fireDepartmentMetrics.incrementProcessed();
                                }
                            } catch (ParseException e2) {
                                if (this.tuningConfig.isReportParseExceptions()) {
                                    throw e2;
                                }
                                log.debug(e2, "Dropping unparseable row from partition[%d] offset[%,d].", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                                this.fireDepartmentMetrics.incrementUnparseable();
                            }
                            this.nextOffsets.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
                        }
                        if (this.nextOffsets.get(Integer.valueOf(consumerRecord.partition())).equals(this.endOffsets.get(Integer.valueOf(consumerRecord.partition()))) && assignPartitionsAndSeekToNext.remove(Integer.valueOf(consumerRecord.partition()))) {
                            log.info("Finished reading topic[%s], partition[%,d].", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition())});
                            assignPartitions(newConsumer, topic, assignPartitionsAndSeekToNext);
                            z = this.ioConfig.isPauseAfterRead() || !assignPartitionsAndSeekToNext.isEmpty();
                        }
                    }
                } catch (Throwable th8) {
                    newDriver.persist((Committer) supplier.get());
                    throw th8;
                }
            }
            newDriver.persist((Committer) supplier.get());
            if (this.stopRequested && !this.publishOnStop) {
                throw new InterruptedException("Stopping without publishing");
            }
            this.status = Status.PUBLISHING;
            SegmentsAndMetadata finish = newDriver.finish(new TransactionalSegmentPublisher() { // from class: io.druid.indexing.kafka.KafkaIndexTask.4
                public boolean publishSegments(Set<DataSegment> set, Object obj) throws IOException {
                    KafkaPartitions kafkaPartitions2 = (KafkaPartitions) taskToolbox.getObjectMapper().convertValue(((Map) obj).get(KafkaIndexTask.METADATA_NEXT_PARTITIONS), KafkaPartitions.class);
                    if (!KafkaIndexTask.this.endOffsets.equals(kafkaPartitions2.getPartitionOffsetMap())) {
                        throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", new Object[]{obj});
                    }
                    SegmentTransactionalInsertAction segmentTransactionalInsertAction = KafkaIndexTask.this.ioConfig.isUseTransaction() ? new SegmentTransactionalInsertAction(set, new KafkaDataSourceMetadata(KafkaIndexTask.this.ioConfig.getStartPartitions()), new KafkaDataSourceMetadata(kafkaPartitions2)) : new SegmentTransactionalInsertAction(set, (DataSourceMetadata) null, (DataSourceMetadata) null);
                    KafkaIndexTask.log.info("Publishing with isTransaction[%s].", new Object[]{Boolean.valueOf(KafkaIndexTask.this.ioConfig.isUseTransaction())});
                    return ((SegmentPublishResult) taskToolbox.getTaskActionClient().submit(segmentTransactionalInsertAction)).isSuccess();
                }
            }, (Committer) supplier.get());
            if (finish == null) {
                throw new ISE("Transaction failure publishing segments, aborting", new Object[0]);
            }
            log.info("Published segments[%s] with metadata[%s].", new Object[]{Joiner.on(", ").join(Iterables.transform(finish.getSegments(), new Function<DataSegment, String>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.5
                public String apply(DataSegment dataSegment) {
                    return dataSegment.getIdentifier();
                }
            })), finish.getCommitMetadata()});
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th9) {
                        th7.addSuppressed(th9);
                    }
                } else {
                    newConsumer.close();
                }
            }
            if (newDriver != null) {
                if (0 != 0) {
                    try {
                        newDriver.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    newDriver.close();
                }
            }
            if (newAppenderator != null) {
                if (0 != 0) {
                    try {
                        newAppenderator.close();
                    } catch (Throwable th11) {
                        th.addSuppressed(th11);
                    }
                } else {
                    newAppenderator.close();
                }
            }
            return success();
        } catch (Throwable th12) {
            if (r18 != 0) {
                if (r19 != 0) {
                    try {
                        r18.close();
                    } catch (Throwable th13) {
                        r19.addSuppressed(th13);
                    }
                } else {
                    r18.close();
                }
            }
            throw th12;
        }
    }

    public boolean canRestore() {
        return true;
    }

    @POST
    @Path("/stop")
    public void stopGracefully() {
        log.info("Stopping gracefully.", new Object[0]);
        this.stopRequested = true;
        if (this.runThread.isAlive()) {
            log.info("Interrupting run thread (status: [%s])", new Object[]{this.status});
            this.runThread.interrupt();
        }
    }

    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.appenderator == null ? new NoopQueryRunner() : new QueryRunner<T>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.6
            public Sequence<T> run(Query<T> query2, Map<String, Object> map) {
                return query2.run(KafkaIndexTask.this.appenderator, map);
            }
        };
    }

    @GET
    @Produces({"application/json"})
    @Path("/status")
    public Status getStatus() {
        return this.status;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/current")
    public Map<Integer, Long> getCurrentOffsets() {
        return this.nextOffsets;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/end")
    public Map<Integer, Long> getEndOffsets() {
        return this.endOffsets;
    }

    @Path("/offsets/end")
    @Consumes({"application/json"})
    @POST
    @Produces({"application/json"})
    public Response setEndOffsets(Map<Integer, Long> map, @QueryParam("resume") @DefaultValue("false") boolean z) throws InterruptedException {
        if (map == null) {
            return Response.status(Response.Status.BAD_REQUEST).entity("Request body must contain a map of { partition:endOffset }").build();
        }
        if (!this.endOffsets.keySet().containsAll(map.keySet())) {
            return Response.status(Response.Status.BAD_REQUEST).entity(String.format("Request contains partitions not being handled by this task, my partitions: %s", this.endOffsets.keySet())).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            if (!isPaused()) {
                Response build = Response.status(Response.Status.BAD_REQUEST).entity("Task must be paused before changing the end offsets").build();
                this.pauseLock.unlock();
                return build;
            }
            for (Map.Entry<Integer, Long> entry : map.entrySet()) {
                if (entry.getValue().compareTo(this.nextOffsets.get(entry.getKey())) < 0) {
                    Response build2 = Response.status(Response.Status.BAD_REQUEST).entity(String.format("End offset must be >= current offset for partition [%s] (current: %s)", entry.getKey(), this.nextOffsets.get(entry.getKey()))).build();
                    this.pauseLock.unlock();
                    return build2;
                }
            }
            this.endOffsets.putAll(map);
            log.info("endOffsets changed to %s", new Object[]{this.endOffsets});
            this.pauseLock.unlock();
            if (z) {
                resume();
            }
            return Response.ok(this.endOffsets).build();
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }

    @POST
    @Produces({"application/json"})
    @Path("/pause")
    public Response pause(@QueryParam("timeout") @DefaultValue("0") long j) throws InterruptedException {
        if (this.status != Status.PAUSED && this.status != Status.READING) {
            return Response.status(Response.Status.BAD_REQUEST).entity(String.format("Can't pause, task is not in a pausable state (state: [%s])", this.status)).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseMillis = j <= 0 ? -1L : j;
            this.pauseRequested = true;
            if (isPaused()) {
                this.shouldResume.signalAll();
            }
            long nanos = TimeUnit.SECONDS.toNanos(2L);
            while (!isPaused()) {
                if (nanos <= 0) {
                    Response build = Response.status(Response.Status.ACCEPTED).entity("Request accepted but task has not yet paused").build();
                    this.pauseLock.unlock();
                    return build;
                }
                nanos = this.hasPaused.awaitNanos(nanos);
            }
            try {
                return Response.ok().entity(this.mapper.writeValueAsString(getCurrentOffsets())).build();
            } catch (JsonProcessingException e) {
                throw Throwables.propagate(e);
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @POST
    @Path("/resume")
    public void resume() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = false;
            this.shouldResume.signalAll();
            long nanos = TimeUnit.SECONDS.toNanos(5L);
            while (isPaused()) {
                if (nanos <= 0) {
                    throw new RuntimeException("Resume command was not accepted within 5 seconds");
                }
                nanos = this.shouldResume.awaitNanos(nanos);
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/time/start")
    public DateTime getStartTime() {
        return this.startTime;
    }

    @VisibleForTesting
    FireDepartmentMetrics getFireDepartmentMetrics() {
        return this.fireDepartmentMetrics;
    }

    private boolean isPaused() {
        return this.status == Status.PAUSED;
    }

    private Appenderator newAppenderator(FireDepartmentMetrics fireDepartmentMetrics, TaskToolbox taskToolbox) {
        return Appenderators.createRealtime(this.dataSchema, this.tuningConfig.withBasePersistDirectory(new File(taskToolbox.getTaskWorkDir(), "persist")), fireDepartmentMetrics, taskToolbox.getSegmentPusher(), taskToolbox.getObjectMapper(), taskToolbox.getIndexIO(), this.tuningConfig.getBuildV9Directly() ? taskToolbox.getIndexMergerV9() : taskToolbox.getIndexMerger(), taskToolbox.getQueryRunnerFactoryConglomerate(), taskToolbox.getSegmentAnnouncer(), taskToolbox.getEmitter(), taskToolbox.getQueryExecutorService(), taskToolbox.getCache(), taskToolbox.getCacheConfig());
    }

    private FiniteAppenderatorDriver newDriver(Appenderator appenderator, TaskToolbox taskToolbox) {
        return new FiniteAppenderatorDriver(appenderator, new ActionBasedSegmentAllocator(taskToolbox.getTaskActionClient(), this.dataSchema), taskToolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(taskToolbox.getTaskActionClient()), taskToolbox.getObjectMapper(), this.tuningConfig.getMaxRowsPerSegment(), this.tuningConfig.getHandoffConditionTimeout());
    }

    private KafkaConsumer<byte[], byte[]> newConsumer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            Properties properties = new Properties();
            for (Map.Entry<String, String> entry : this.ioConfig.getConsumerProperties().entrySet()) {
                properties.setProperty(entry.getKey(), entry.getValue());
            }
            properties.setProperty("enable.auto.commit", "false");
            properties.setProperty("auto.offset.reset", "none");
            properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
            properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private static void assignPartitions(KafkaConsumer kafkaConsumer, final String str, Set<Integer> set) {
        kafkaConsumer.assign(Lists.newArrayList(Iterables.transform(set, new Function<Integer, TopicPartition>() { // from class: io.druid.indexing.kafka.KafkaIndexTask.7
            public TopicPartition apply(Integer num) {
                return new TopicPartition(str, num.intValue());
            }
        })));
    }

    private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer kafkaConsumer, String str) {
        HashSet newHashSet = Sets.newHashSet();
        for (Map.Entry<Integer, Long> entry : this.nextOffsets.entrySet()) {
            long longValue = this.endOffsets.get(entry.getKey()).longValue();
            if (entry.getValue().longValue() < longValue) {
                newHashSet.add(entry.getKey());
            } else {
                if (entry.getValue().longValue() != longValue) {
                    throw new ISE("WTF?! Cannot start from offset[%,d] > endOffset[%,d]", new Object[]{entry.getValue(), Long.valueOf(longValue)});
                }
                log.info("Finished reading partition[%d].", new Object[]{entry.getKey()});
            }
        }
        assignPartitions(kafkaConsumer, str, newHashSet);
        Iterator it = newHashSet.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            long longValue2 = this.nextOffsets.get(Integer.valueOf(intValue)).longValue();
            log.info("Seeking partition[%d] to offset[%,d].", new Object[]{Integer.valueOf(intValue), Long.valueOf(longValue2)});
            kafkaConsumer.seek(new TopicPartition(str, intValue), longValue2);
        }
        return newHashSet;
    }

    private boolean possiblyPause(Set<Integer> set) throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            if (this.ioConfig.isPauseAfterRead() && set.isEmpty()) {
                this.pauseMillis = -1L;
                this.pauseRequested = true;
            }
            if (!this.pauseRequested) {
                this.pauseLock.unlock();
                return false;
            }
            this.status = Status.PAUSED;
            long j = 0;
            this.hasPaused.signalAll();
            while (this.pauseRequested) {
                if (this.pauseMillis == -1) {
                    log.info("Pausing ingestion until resumed", new Object[0]);
                    this.shouldResume.await();
                } else {
                    if (this.pauseMillis > 0) {
                        log.info("Pausing ingestion for [%,d] ms", new Object[]{Long.valueOf(this.pauseMillis)});
                        j = TimeUnit.MILLISECONDS.toNanos(this.pauseMillis);
                        this.pauseMillis = 0L;
                    }
                    if (j <= 0) {
                        this.pauseRequested = false;
                    }
                    j = this.shouldResume.awaitNanos(j);
                }
            }
            this.status = Status.READING;
            this.shouldResume.signalAll();
            log.info("Ingestion loop resumed", new Object[0]);
            this.pauseLock.unlock();
            return true;
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }
}
