package io.druid.indexing.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.kafka.KafkaIndexTask;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.Map;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;

/* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTaskClient.class */
public class KafkaIndexTaskClient {
    private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
    private static final String BASE_PATH = "/druid/worker/v1/chat";
    private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
    private final HttpClient httpClient;
    private final ObjectMapper jsonMapper;
    private final TaskInfoProvider taskInfoProvider;
    private final RetryPolicyFactory retryPolicyFactory = createRetryPolicyFactory();

    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTaskClient$NoTaskLocationException.class */
    public class NoTaskLocationException extends RuntimeException {
        public NoTaskLocationException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTaskClient$TaskNotRunnableException.class */
    public class TaskNotRunnableException extends RuntimeException {
        public TaskNotRunnableException(String str) {
            super(str);
        }
    }

    public KafkaIndexTaskClient(HttpClient httpClient, ObjectMapper objectMapper, TaskInfoProvider taskInfoProvider) {
        this.httpClient = httpClient;
        this.jsonMapper = objectMapper;
        this.taskInfoProvider = taskInfoProvider;
    }

    public void stop(String str, boolean z) {
        submitRequest(str, HttpMethod.POST, "stop", z ? "publish=true" : null, true);
    }

    public void resume(String str) {
        submitRequest(str, HttpMethod.POST, "resume", null, true);
    }

    public Map<Integer, Long> pause(String str) {
        return pause(str, 0L);
    }

    public Map<Integer, Long> pause(String str, long j) {
        try {
            FullResponseHolder submitRequest = submitRequest(str, HttpMethod.POST, "pause", j > 0 ? String.format("timeout=%d", Long.valueOf(j)) : null, true);
            if (submitRequest.getStatus().equals(HttpResponseStatus.OK)) {
                return (Map) this.jsonMapper.readValue(submitRequest.getContent(), new TypeReference<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.1
                });
            }
            RetryPolicy makeRetryPolicy = this.retryPolicyFactory.makeRetryPolicy();
            while (getStatus(str) != KafkaIndexTask.Status.PAUSED) {
                Duration andIncrementRetryDelay = makeRetryPolicy.getAndIncrementRetryDelay();
                if (andIncrementRetryDelay == null) {
                    throw new ISE("Task [%s] failed to pause, aborting", new Object[]{str});
                }
                long millis = andIncrementRetryDelay.getMillis();
                log.info("Still waiting for task [%s] to pause; will try again in [%s]", new Object[]{str, new Duration(millis).toString()});
                Thread.sleep(millis);
            }
            return getCurrentOffsets(str, true);
        } catch (IOException | InterruptedException e) {
            throw Throwables.propagate(e);
        }
    }

    public KafkaIndexTask.Status getStatus(String str) {
        try {
            return (KafkaIndexTask.Status) this.jsonMapper.readValue(submitRequest(str, HttpMethod.GET, "status", null, true).getContent(), KafkaIndexTask.Status.class);
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    public DateTime getStartTime(String str, boolean z) {
        try {
            FullResponseHolder submitRequest = submitRequest(str, HttpMethod.GET, "time/start", null, z);
            if (submitRequest.getContent() == null || submitRequest.getContent().isEmpty()) {
                return null;
            }
            return (DateTime) this.jsonMapper.readValue(submitRequest.getContent(), DateTime.class);
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    public Map<Integer, Long> getCurrentOffsets(String str, boolean z) {
        try {
            return (Map) this.jsonMapper.readValue(submitRequest(str, HttpMethod.GET, "offsets/current", null, z).getContent(), new TypeReference<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.2
            });
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    public Map<Integer, Long> getEndOffsets(String str) {
        try {
            return (Map) this.jsonMapper.readValue(submitRequest(str, HttpMethod.GET, "offsets/end", null, true).getContent(), new TypeReference<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.3
            });
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    public void setEndOffsets(String str, Map<Integer, Long> map) {
        setEndOffsets(str, map, false);
    }

    public void setEndOffsets(String str, Map<Integer, Long> map, boolean z) {
        try {
            submitRequest(str, HttpMethod.POST, "offsets/end", z ? "resume=true" : null, this.jsonMapper.writeValueAsBytes(map), true);
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    RetryPolicyFactory createRetryPolicyFactory() {
        return new RetryPolicyFactory(new RetryPolicyConfig().setMinWait(Period.seconds(2)).setMaxWait(Period.seconds(8)).setMaxRetryCount(8L));
    }

    @VisibleForTesting
    void checkConnection(String str, int i) throws IOException {
        new Socket(str, i).close();
    }

    private FullResponseHolder submitRequest(String str, HttpMethod httpMethod, String str2, String str3, boolean z) {
        return submitRequest(str, httpMethod, str2, str3, new byte[0], z);
    }

    private FullResponseHolder submitRequest(String str, HttpMethod httpMethod, String str2, String str3, byte[] bArr, boolean z) {
        Duration andIncrementRetryDelay;
        RetryPolicy makeRetryPolicy = this.retryPolicyFactory.makeRetryPolicy();
        while (true) {
            FullResponseHolder fullResponseHolder = null;
            Request request = null;
            Optional taskStatus = this.taskInfoProvider.getTaskStatus(str);
            if (!taskStatus.isPresent() || !((TaskStatus) taskStatus.get()).isRunnable()) {
                break;
            }
            try {
                TaskLocation taskLocation = this.taskInfoProvider.getTaskLocation(str);
                if (taskLocation.equals(TaskLocation.unknown())) {
                    log.info("No TaskLocation available for task [%s], this task may not have been assigned to a worker yet", new Object[]{str});
                    throw new NoTaskLocationException(String.format("No TaskLocation available for task [%s]", str));
                }
                checkConnection(taskLocation.getHost(), taskLocation.getPort());
                try {
                    Request request2 = new Request(httpMethod, new URI("http", null, taskLocation.getHost(), taskLocation.getPort(), String.format("%s/%s/%s", BASE_PATH, str, str2), str3, null).toURL());
                    request2.addHeader("X-Druid-Task-Id", str);
                    if (bArr.length > 0) {
                        request2.setContent("application/json", bArr);
                    }
                    FullResponseHolder fullResponseHolder2 = (FullResponseHolder) this.httpClient.go(request2, new FullResponseHandler(Charsets.UTF_8)).get();
                    int code = fullResponseHolder2.getStatus().getCode();
                    if (code / 100 == 2) {
                        return fullResponseHolder2;
                    }
                    if (code == 400) {
                        throw new IAE("Received 400 Bad Request with body: %s", new Object[]{fullResponseHolder2.getContent()});
                    }
                    throw new IOException(String.format("Received status [%d]", Integer.valueOf(code)));
                } catch (Exception e) {
                    Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
                    Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class);
                    throw Throwables.propagate(e);
                }
            } catch (IOException | ChannelException e2) {
                if (0 == 0 || !fullResponseHolder.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
                    andIncrementRetryDelay = makeRetryPolicy.getAndIncrementRetryDelay();
                } else {
                    String str4 = fullResponseHolder.getResponse().headers().get("X-Druid-Task-Id");
                    if (str4 == null || str4.equals(str)) {
                        andIncrementRetryDelay = makeRetryPolicy.getAndIncrementRetryDelay();
                    } else {
                        log.warn("Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s", new Object[]{str, str4, Integer.valueOf(TASK_MISMATCH_RETRY_DELAY_SECONDS)});
                        andIncrementRetryDelay = Duration.standardSeconds(5L);
                    }
                }
                if (!z || andIncrementRetryDelay == null) {
                    Throwables.propagate(e2);
                } else {
                    try {
                        long millis = andIncrementRetryDelay.getMillis();
                        EmittingLogger emittingLogger = log;
                        Object[] objArr = new Object[4];
                        objArr[0] = Integer.valueOf(0 != 0 ? fullResponseHolder.getStatus().getCode() : 0);
                        objArr[1] = 0 != 0 ? request.getUrl() : "-";
                        objArr[2] = new Duration(millis).toString();
                        objArr[3] = 0 != 0 ? fullResponseHolder.getContent() : "[empty]";
                        emittingLogger.debug("Bad response HTTP [%d] from %s; will try again in [%s] (body: [%s])", objArr);
                        Thread.sleep(millis);
                    } catch (InterruptedException e3) {
                        Throwables.propagate(e3);
                    }
                }
            }
        }
        throw new TaskNotRunnableException(String.format("Aborting request because task [%s] is not runnable", str));
    }
}
