package io.druid.indexing.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.kafka.KafkaIndexTask;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import java.io.IOException;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.Callable;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;

/* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTaskClient.class */
public class KafkaIndexTaskClient {
    public static final int MAX_RETRY_WAIT_SECONDS = 10;
    private static final int MIN_RETRY_WAIT_SECONDS = 2;
    private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
    private static final String BASE_PATH = "/druid/worker/v1/chat";
    private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
    private final HttpClient httpClient;
    private final ObjectMapper jsonMapper;
    private final TaskInfoProvider taskInfoProvider;
    private final Duration httpTimeout;
    private final RetryPolicyFactory retryPolicyFactory = createRetryPolicyFactory();
    private final ListeningExecutorService executorService;
    private final long numRetries;

    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTaskClient$NoTaskLocationException.class */
    public static class NoTaskLocationException extends RuntimeException {
        public NoTaskLocationException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:io/druid/indexing/kafka/KafkaIndexTaskClient$TaskNotRunnableException.class */
    public static class TaskNotRunnableException extends RuntimeException {
        public TaskNotRunnableException(String str) {
            super(str);
        }
    }

    public KafkaIndexTaskClient(HttpClient httpClient, ObjectMapper objectMapper, TaskInfoProvider taskInfoProvider, String str, int i, Duration duration, long j) {
        this.httpClient = httpClient;
        this.jsonMapper = objectMapper;
        this.taskInfoProvider = taskInfoProvider;
        this.httpTimeout = duration;
        this.numRetries = j;
        this.executorService = MoreExecutors.listeningDecorator(Execs.multiThreaded(i, StringUtils.format("KafkaIndexTaskClient-%s-%%d", new Object[]{str})));
    }

    public void close() {
        this.executorService.shutdownNow();
    }

    public boolean stop(String str, boolean z) {
        log.debug("Stop task[%s] publish[%s]", new Object[]{str, Boolean.valueOf(z)});
        try {
            return submitRequest(str, HttpMethod.POST, "stop", z ? "publish=true" : null, true).getStatus().getCode() / 100 == MIN_RETRY_WAIT_SECONDS;
        } catch (NoTaskLocationException e) {
            return false;
        } catch (TaskNotRunnableException e2) {
            log.info("Task [%s] couldn't be stopped because it is no longer running", new Object[]{str});
            return true;
        } catch (Exception e3) {
            log.warn(e3, "Exception while stopping task [%s]", new Object[]{str});
            return false;
        }
    }

    public boolean resume(String str) {
        log.debug("Resume task[%s]", new Object[]{str});
        try {
            return submitRequest(str, HttpMethod.POST, "resume", null, true).getStatus().getCode() / 100 == MIN_RETRY_WAIT_SECONDS;
        } catch (NoTaskLocationException e) {
            return false;
        }
    }

    public Map<Integer, Long> pause(String str) {
        return pause(str, 0L);
    }

    public Map<Integer, Long> pause(String str, long j) {
        log.debug("Pause task[%s] timeout[%d]", new Object[]{str, Long.valueOf(j)});
        try {
            try {
                FullResponseHolder submitRequest = submitRequest(str, HttpMethod.POST, "pause", j > 0 ? StringUtils.format("timeout=%d", new Object[]{Long.valueOf(j)}) : null, true);
                if (submitRequest.getStatus().equals(HttpResponseStatus.OK)) {
                    log.info("Task [%s] paused successfully", new Object[]{str});
                    return (Map) this.jsonMapper.readValue(submitRequest.getContent(), new TypeReference<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.1
                    });
                }
                RetryPolicy makeRetryPolicy = this.retryPolicyFactory.makeRetryPolicy();
                while (getStatus(str) != KafkaIndexTask.Status.PAUSED) {
                    Duration andIncrementRetryDelay = makeRetryPolicy.getAndIncrementRetryDelay();
                    if (andIncrementRetryDelay == null) {
                        log.error("Task [%s] failed to pause, aborting", new Object[]{str});
                        throw new ISE("Task [%s] failed to pause, aborting", new Object[]{str});
                    }
                    long millis = andIncrementRetryDelay.getMillis();
                    log.info("Still waiting for task [%s] to pause; will try again in [%s]", new Object[]{str, new Duration(millis).toString()});
                    Thread.sleep(millis);
                }
                return getCurrentOffsets(str, true);
            } catch (NoTaskLocationException e) {
                log.error("Exception [%s] while pausing Task [%s]", new Object[]{e.getMessage(), str});
                return ImmutableMap.of();
            }
        } catch (IOException | InterruptedException e2) {
            log.error("Exception [%s] while pausing Task [%s]", new Object[]{e2.getMessage(), str});
            throw Throwables.propagate(e2);
        }
    }

    public KafkaIndexTask.Status getStatus(String str) {
        log.debug("GetStatus task[%s]", new Object[]{str});
        try {
            return (KafkaIndexTask.Status) this.jsonMapper.readValue(submitRequest(str, HttpMethod.GET, "status", null, true).getContent(), KafkaIndexTask.Status.class);
        } catch (NoTaskLocationException e) {
            return KafkaIndexTask.Status.NOT_STARTED;
        } catch (IOException e2) {
            throw Throwables.propagate(e2);
        }
    }

    public DateTime getStartTime(String str) {
        log.debug("GetStartTime task[%s]", new Object[]{str});
        try {
            FullResponseHolder submitRequest = submitRequest(str, HttpMethod.GET, "time/start", null, true);
            if (submitRequest.getContent() == null || submitRequest.getContent().isEmpty()) {
                return null;
            }
            return (DateTime) this.jsonMapper.readValue(submitRequest.getContent(), DateTime.class);
        } catch (NoTaskLocationException e) {
            return null;
        } catch (IOException e2) {
            throw Throwables.propagate(e2);
        }
    }

    public Map<Integer, Long> getCurrentOffsets(String str, boolean z) {
        log.debug("GetCurrentOffsets task[%s] retry[%s]", new Object[]{str, Boolean.valueOf(z)});
        try {
            return (Map) this.jsonMapper.readValue(submitRequest(str, HttpMethod.GET, "offsets/current", null, z).getContent(), new TypeReference<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.2
            });
        } catch (NoTaskLocationException e) {
            return ImmutableMap.of();
        } catch (IOException e2) {
            throw Throwables.propagate(e2);
        }
    }

    public Map<Integer, Long> getEndOffsets(String str) {
        log.debug("GetEndOffsets task[%s]", new Object[]{str});
        try {
            return (Map) this.jsonMapper.readValue(submitRequest(str, HttpMethod.GET, "offsets/end", null, true).getContent(), new TypeReference<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.3
            });
        } catch (NoTaskLocationException e) {
            return ImmutableMap.of();
        } catch (IOException e2) {
            throw Throwables.propagate(e2);
        }
    }

    public boolean setEndOffsets(String str, Map<Integer, Long> map) {
        return setEndOffsets(str, map, false);
    }

    public boolean setEndOffsets(String str, Map<Integer, Long> map, boolean z) {
        log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s]", new Object[]{str, map, Boolean.valueOf(z)});
        try {
            return submitRequest(str, HttpMethod.POST, "offsets/end", z ? "resume=true" : null, this.jsonMapper.writeValueAsBytes(map), true).getStatus().getCode() / 100 == MIN_RETRY_WAIT_SECONDS;
        } catch (NoTaskLocationException e) {
            return false;
        } catch (IOException e2) {
            throw Throwables.propagate(e2);
        }
    }

    public ListenableFuture<Boolean> stopAsync(final String str, final boolean z) {
        return this.executorService.submit(new Callable<Boolean>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(KafkaIndexTaskClient.this.stop(str, z));
            }
        });
    }

    public ListenableFuture<Boolean> resumeAsync(final String str) {
        return this.executorService.submit(new Callable<Boolean>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(KafkaIndexTaskClient.this.resume(str));
            }
        });
    }

    public ListenableFuture<Map<Integer, Long>> pauseAsync(String str) {
        return pauseAsync(str, 0L);
    }

    public ListenableFuture<Map<Integer, Long>> pauseAsync(final String str, final long j) {
        return this.executorService.submit(new Callable<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Map<Integer, Long> call() throws Exception {
                return KafkaIndexTaskClient.this.pause(str, j);
            }
        });
    }

    public ListenableFuture<KafkaIndexTask.Status> getStatusAsync(final String str) {
        return this.executorService.submit(new Callable<KafkaIndexTask.Status>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public KafkaIndexTask.Status call() throws Exception {
                return KafkaIndexTaskClient.this.getStatus(str);
            }
        });
    }

    public ListenableFuture<DateTime> getStartTimeAsync(final String str) {
        return this.executorService.submit(new Callable<DateTime>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public DateTime call() throws Exception {
                return KafkaIndexTaskClient.this.getStartTime(str);
            }
        });
    }

    public ListenableFuture<Map<Integer, Long>> getCurrentOffsetsAsync(final String str, final boolean z) {
        return this.executorService.submit(new Callable<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Map<Integer, Long> call() throws Exception {
                return KafkaIndexTaskClient.this.getCurrentOffsets(str, z);
            }
        });
    }

    public ListenableFuture<Map<Integer, Long>> getEndOffsetsAsync(final String str) {
        return this.executorService.submit(new Callable<Map<Integer, Long>>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.10
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Map<Integer, Long> call() throws Exception {
                return KafkaIndexTaskClient.this.getEndOffsets(str);
            }
        });
    }

    public ListenableFuture<Boolean> setEndOffsetsAsync(String str, Map<Integer, Long> map) {
        return setEndOffsetsAsync(str, map, false);
    }

    public ListenableFuture<Boolean> setEndOffsetsAsync(final String str, final Map<Integer, Long> map, final boolean z) {
        return this.executorService.submit(new Callable<Boolean>() { // from class: io.druid.indexing.kafka.KafkaIndexTaskClient.11
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(KafkaIndexTaskClient.this.setEndOffsets(str, map, z));
            }
        });
    }

    @VisibleForTesting
    RetryPolicyFactory createRetryPolicyFactory() {
        return new RetryPolicyFactory(new RetryPolicyConfig().setMinWait(Period.seconds(MIN_RETRY_WAIT_SECONDS)).setMaxWait(Period.seconds(10)).setMaxRetryCount(this.numRetries));
    }

    @VisibleForTesting
    void checkConnection(String str, int i) throws IOException {
        new Socket(str, i).close();
    }

    private FullResponseHolder submitRequest(String str, HttpMethod httpMethod, String str2, String str3, boolean z) {
        return submitRequest(str, httpMethod, str2, str3, new byte[0], z);
    }

    /* JADX WARN: Code restructure failed: missing block: B:93:0x0062, code lost:
    
        throw new io.druid.indexing.kafka.KafkaIndexTaskClient.TaskNotRunnableException(io.druid.java.util.common.StringUtils.format("Aborting request because task [%s] is not runnable", new java.lang.Object[]{r11}));
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private com.metamx.http.client.response.FullResponseHolder submitRequest(java.lang.String r11, org.jboss.netty.handler.codec.http.HttpMethod r12, java.lang.String r13, java.lang.String r14, byte[] r15, boolean r16) {
        /*
            Method dump skipped, instructions count: 816
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.druid.indexing.kafka.KafkaIndexTaskClient.submitRequest(java.lang.String, org.jboss.netty.handler.codec.http.HttpMethod, java.lang.String, java.lang.String, byte[], boolean):com.metamx.http.client.response.FullResponseHolder");
    }
}
