package io.druid.indexing.overlord.http;

import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.http.security.TaskResourceFilter;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.EntryExistsException;
import io.druid.server.http.security.ConfigResourceFilter;
import io.druid.server.http.security.StateResourceFilter;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthConfig;
import io.druid.server.security.AuthorizationInfo;
import io.druid.server.security.Resource;
import io.druid.server.security.ResourceType;
import io.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.joda.time.DateTime;
import org.joda.time.Interval;

@Path("/druid/indexer/v1")
/* loaded from: input_file:io/druid/indexing/overlord/http/OverlordResource.class */
public class OverlordResource {
    private static final Logger log = new Logger(OverlordResource.class);
    private final TaskMaster taskMaster;
    private final TaskStorageQueryAdapter taskStorageQueryAdapter;
    private final TaskLogStreamer taskLogStreamer;
    private final JacksonConfigManager configManager;
    private final AuditManager auditManager;
    private final AuthConfig authConfig;
    private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/druid/indexing/overlord/http/OverlordResource$TaskResponseObject.class */
    public static class TaskResponseObject {
        private final String id;
        private final DateTime createdTime;
        private final DateTime queueInsertionTime;
        private final Optional<TaskStatus> status;
        private final TaskLocation location;

        private TaskResponseObject(String str, DateTime dateTime, DateTime dateTime2, Optional<TaskStatus> optional, TaskLocation taskLocation) {
            this.id = str;
            this.createdTime = dateTime;
            this.queueInsertionTime = dateTime2;
            this.status = optional;
            this.location = taskLocation;
        }

        @JsonValue
        public Map<String, Object> toJson() {
            LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
            newLinkedHashMap.put("id", this.id);
            if (this.createdTime.getMillis() > 0) {
                newLinkedHashMap.put("createdTime", this.createdTime);
            }
            if (this.queueInsertionTime.getMillis() > 0) {
                newLinkedHashMap.put("queueInsertionTime", this.queueInsertionTime);
            }
            if (this.status.isPresent()) {
                newLinkedHashMap.put("statusCode", ((TaskStatus) this.status.get()).getStatusCode().toString());
                if (((TaskStatus) this.status.get()).isComplete()) {
                    newLinkedHashMap.put("duration", Long.valueOf(((TaskStatus) this.status.get()).getDuration()));
                }
            }
            if (this.location != null) {
                newLinkedHashMap.put("location", this.location);
            }
            return newLinkedHashMap;
        }
    }

    @Inject
    public OverlordResource(TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, TaskLogStreamer taskLogStreamer, JacksonConfigManager jacksonConfigManager, AuditManager auditManager, AuthConfig authConfig) throws Exception {
        this.taskMaster = taskMaster;
        this.taskStorageQueryAdapter = taskStorageQueryAdapter;
        this.taskLogStreamer = taskLogStreamer;
        this.configManager = jacksonConfigManager;
        this.auditManager = auditManager;
        this.authConfig = authConfig;
    }

    @Path("/task")
    @Consumes({"application/json"})
    @POST
    @Produces({"application/json"})
    public Response taskPost(final Task task, @Context HttpServletRequest httpServletRequest) {
        if (this.authConfig.isEnabled()) {
            String dataSource = task.getDataSource();
            AuthorizationInfo authorizationInfo = (AuthorizationInfo) httpServletRequest.getAttribute("Druid-Auth-Token");
            Preconditions.checkNotNull(authorizationInfo, "Security is enabled but no authorization info found in the request");
            Access isAuthorized = authorizationInfo.isAuthorized(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE);
            if (!isAuthorized.isAllowed()) {
                return Response.status(Response.Status.FORBIDDEN).header("Access-Check-Result", isAuthorized).build();
            }
        }
        return asLeaderWith(this.taskMaster.getTaskQueue(), new Function<TaskQueue, Response>() { // from class: io.druid.indexing.overlord.http.OverlordResource.1
            public Response apply(TaskQueue taskQueue) {
                try {
                    taskQueue.add(task);
                    return Response.ok(ImmutableMap.of("task", task.getId())).build();
                } catch (EntryExistsException e) {
                    return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId()))).build();
                }
            }
        });
    }

    @GET
    @Path("/leader")
    @ResourceFilters({StateResourceFilter.class})
    @Produces({"application/json"})
    public Response getLeader() {
        return Response.ok(this.taskMaster.getLeader()).build();
    }

    @GET
    @Path("/task/{taskid}")
    @ResourceFilters({TaskResourceFilter.class})
    @Produces({"application/json"})
    public Response getTaskPayload(@PathParam("taskid") String str) {
        return optionalTaskResponse(str, "payload", this.taskStorageQueryAdapter.getTask(str));
    }

    @GET
    @Path("/task/{taskid}/status")
    @ResourceFilters({TaskResourceFilter.class})
    @Produces({"application/json"})
    public Response getTaskStatus(@PathParam("taskid") String str) {
        return optionalTaskResponse(str, "status", this.taskStorageQueryAdapter.getStatus(str));
    }

    @GET
    @Path("/task/{taskid}/segments")
    @ResourceFilters({TaskResourceFilter.class})
    @Produces({"application/json"})
    public Response getTaskSegments(@PathParam("taskid") String str) {
        return Response.ok().entity(this.taskStorageQueryAdapter.getInsertedSegments(str)).build();
    }

    @Path("/task/{taskid}/shutdown")
    @ResourceFilters({TaskResourceFilter.class})
    @POST
    @Produces({"application/json"})
    public Response doShutdown(@PathParam("taskid") final String str) {
        return asLeaderWith(this.taskMaster.getTaskQueue(), new Function<TaskQueue, Response>() { // from class: io.druid.indexing.overlord.http.OverlordResource.2
            public Response apply(TaskQueue taskQueue) {
                taskQueue.shutdown(str);
                return Response.ok(ImmutableMap.of("task", str)).build();
            }
        });
    }

    @GET
    @Path("/worker")
    @ResourceFilters({ConfigResourceFilter.class})
    @Produces({"application/json"})
    public Response getWorkerConfig() {
        if (this.workerConfigRef == null) {
            this.workerConfigRef = this.configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class);
        }
        return Response.ok(this.workerConfigRef.get()).build();
    }

    @Path("/worker")
    @Consumes({"application/json"})
    @ResourceFilters({ConfigResourceFilter.class})
    @POST
    public Response setWorkerConfig(WorkerBehaviorConfig workerBehaviorConfig, @HeaderParam("X-Druid-Author") @DefaultValue("") String str, @HeaderParam("X-Druid-Comment") @DefaultValue("") String str2, @Context HttpServletRequest httpServletRequest) {
        if (!this.configManager.set(WorkerBehaviorConfig.CONFIG_KEY, workerBehaviorConfig, new AuditInfo(str, str2, httpServletRequest.getRemoteAddr()))) {
            return Response.status(Response.Status.BAD_REQUEST).build();
        }
        log.info("Updating Worker configs: %s", new Object[]{workerBehaviorConfig});
        return Response.ok().build();
    }

    @GET
    @Path("/worker/history")
    @ResourceFilters({ConfigResourceFilter.class})
    @Produces({"application/json"})
    public Response getWorkerConfigHistory(@QueryParam("interval") String str, @QueryParam("count") Integer num) {
        Interval interval = str == null ? null : new Interval(str);
        if (interval != null || num == null) {
            return Response.ok(this.auditManager.fetchAuditHistory(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.CONFIG_KEY, interval)).build();
        }
        try {
            return Response.ok(this.auditManager.fetchAuditHistory(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.CONFIG_KEY, num.intValue())).build();
        } catch (IllegalArgumentException e) {
            return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", e.getMessage())).build();
        }
    }

    @Path("/action")
    @ResourceFilters({StateResourceFilter.class})
    @POST
    @Produces({"application/json"})
    public Response doAction(final TaskActionHolder taskActionHolder) {
        return asLeaderWith(this.taskMaster.getTaskActionClient(taskActionHolder.getTask()), new Function<TaskActionClient, Response>() { // from class: io.druid.indexing.overlord.http.OverlordResource.3
            public Response apply(TaskActionClient taskActionClient) {
                try {
                    Object submit = taskActionClient.submit(taskActionHolder.getAction());
                    HashMap newHashMap = Maps.newHashMap();
                    newHashMap.put("result", submit);
                    return Response.ok().entity(newHashMap).build();
                } catch (IOException e) {
                    OverlordResource.log.warn(e, "Failed to perform task action", new Object[0]);
                    return Response.serverError().build();
                }
            }
        });
    }

    @GET
    @Produces({"application/json"})
    @Path("/waitingTasks")
    public Response getWaitingTasks(@Context final HttpServletRequest httpServletRequest) {
        return workItemsResponse(new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>() { // from class: io.druid.indexing.overlord.http.OverlordResource.4
            public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner) {
                ImmutableList immutableList;
                ImmutableList activeTasks = OverlordResource.this.taskStorageQueryAdapter.getActiveTasks();
                if (OverlordResource.this.authConfig.isEnabled()) {
                    final HashMap hashMap = new HashMap();
                    final AuthorizationInfo authorizationInfo = (AuthorizationInfo) httpServletRequest.getAttribute("Druid-Auth-Token");
                    immutableList = ImmutableList.copyOf(Iterables.filter(activeTasks, new Predicate<Task>() { // from class: io.druid.indexing.overlord.http.OverlordResource.4.1
                        public boolean apply(Task task) {
                            Pair pair = new Pair(new Resource(task.getDataSource(), ResourceType.DATASOURCE), Action.READ);
                            if (hashMap.containsKey(pair)) {
                                return ((Access) hashMap.get(pair)).isAllowed();
                            }
                            Access isAuthorized = authorizationInfo.isAuthorized((Resource) pair.lhs, (Action) pair.rhs);
                            hashMap.put(pair, isAuthorized);
                            return isAuthorized.isAllowed();
                        }
                    }));
                } else {
                    immutableList = activeTasks;
                }
                HashSet newHashSet = Sets.newHashSet(Iterables.transform(taskRunner.getKnownTasks(), new Function<TaskRunnerWorkItem, String>() { // from class: io.druid.indexing.overlord.http.OverlordResource.4.2
                    public String apply(TaskRunnerWorkItem taskRunnerWorkItem) {
                        return taskRunnerWorkItem.getTaskId();
                    }
                }));
                ArrayList newArrayList = Lists.newArrayList();
                for (Task task : immutableList) {
                    if (!newHashSet.contains(task.getId())) {
                        newArrayList.add(new TaskRunnerWorkItem(task.getId(), SettableFuture.create(), new DateTime(0L), new DateTime(0L)) { // from class: io.druid.indexing.overlord.http.OverlordResource.4.3
                            @Override // io.druid.indexing.overlord.TaskRunnerWorkItem
                            public TaskLocation getLocation() {
                                return TaskLocation.unknown();
                            }
                        });
                    }
                }
                return newArrayList;
            }
        });
    }

    @GET
    @Produces({"application/json"})
    @Path("/pendingTasks")
    public Response getPendingTasks(@Context final HttpServletRequest httpServletRequest) {
        return workItemsResponse(new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>() { // from class: io.druid.indexing.overlord.http.OverlordResource.5
            public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner) {
                return OverlordResource.this.authConfig.isEnabled() ? OverlordResource.this.securedTaskRunnerWorkItem(taskRunner.getPendingTasks(), httpServletRequest) : taskRunner.getPendingTasks();
            }
        });
    }

    @GET
    @Produces({"application/json"})
    @Path("/runningTasks")
    public Response getRunningTasks(@Context final HttpServletRequest httpServletRequest) {
        return workItemsResponse(new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>() { // from class: io.druid.indexing.overlord.http.OverlordResource.6
            public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner) {
                return OverlordResource.this.authConfig.isEnabled() ? OverlordResource.this.securedTaskRunnerWorkItem(taskRunner.getRunningTasks(), httpServletRequest) : taskRunner.getRunningTasks();
            }
        });
    }

    @GET
    @Produces({"application/json"})
    @Path("/completeTasks")
    public Response getCompleteTasks(@Context HttpServletRequest httpServletRequest) {
        ImmutableList recentlyFinishedTaskStatuses;
        if (this.authConfig.isEnabled()) {
            final HashMap hashMap = new HashMap();
            final AuthorizationInfo authorizationInfo = (AuthorizationInfo) httpServletRequest.getAttribute("Druid-Auth-Token");
            recentlyFinishedTaskStatuses = ImmutableList.copyOf(Iterables.filter(this.taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(), new Predicate<TaskStatus>() { // from class: io.druid.indexing.overlord.http.OverlordResource.7
                public boolean apply(TaskStatus taskStatus) {
                    String id = taskStatus.getId();
                    Optional<Task> task = OverlordResource.this.taskStorageQueryAdapter.getTask(id);
                    if (!task.isPresent()) {
                        throw new WebApplicationException(Response.serverError().entity(String.format("No task information found for task with id: [%s]", id)).build());
                    }
                    Pair pair = new Pair(new Resource(((Task) task.get()).getDataSource(), ResourceType.DATASOURCE), Action.READ);
                    if (hashMap.containsKey(pair)) {
                        return ((Access) hashMap.get(pair)).isAllowed();
                    }
                    Access isAuthorized = authorizationInfo.isAuthorized((Resource) pair.lhs, (Action) pair.rhs);
                    hashMap.put(pair, isAuthorized);
                    return isAuthorized.isAllowed();
                }
            }));
        } else {
            recentlyFinishedTaskStatuses = this.taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses();
        }
        return Response.ok(Lists.transform(recentlyFinishedTaskStatuses, new Function<TaskStatus, TaskResponseObject>() { // from class: io.druid.indexing.overlord.http.OverlordResource.8
            public TaskResponseObject apply(TaskStatus taskStatus) {
                return new TaskResponseObject(taskStatus.getId(), new DateTime(0L), new DateTime(0L), Optional.of(taskStatus), TaskLocation.unknown());
            }
        })).build();
    }

    @GET
    @Path("/workers")
    @ResourceFilters({StateResourceFilter.class})
    @Produces({"application/json"})
    public Response getWorkers() {
        return asLeaderWith(this.taskMaster.getTaskRunner(), new Function<TaskRunner, Response>() { // from class: io.druid.indexing.overlord.http.OverlordResource.9
            public Response apply(TaskRunner taskRunner) {
                if (taskRunner instanceof WorkerTaskRunner) {
                    return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
                }
                OverlordResource.log.debug("Task runner [%s] of type [%s] does not support listing workers", new Object[]{taskRunner, taskRunner.getClass().getCanonicalName()});
                return Response.serverError().entity(ImmutableMap.of("error", "Task Runner does not support worker listing")).build();
            }
        });
    }

    @GET
    @Path("/scaling")
    @ResourceFilters({StateResourceFilter.class})
    @Produces({"application/json"})
    public Response getScalingState() {
        Optional<ScalingStats> scalingStats = this.taskMaster.getScalingStats();
        return scalingStats.isPresent() ? Response.ok(scalingStats.get()).build() : Response.ok().build();
    }

    @GET
    @Path("/task/{taskid}/log")
    @ResourceFilters({TaskResourceFilter.class})
    @Produces({"text/plain"})
    public Response doGetLog(@PathParam("taskid") String str, @QueryParam("offset") @DefaultValue("0") long j) {
        try {
            Optional streamTaskLog = this.taskLogStreamer.streamTaskLog(str, j);
            return streamTaskLog.isPresent() ? Response.ok(((ByteSource) streamTaskLog.get()).openStream()).build() : Response.status(Response.Status.NOT_FOUND).entity("No log was found for this task. The task may not exist, or it may not have begun running yet.").build();
        } catch (Exception e) {
            log.warn(e, "Failed to stream log for task %s", new Object[]{str});
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
        }
    }

    private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> function) {
        return asLeaderWith(this.taskMaster.getTaskRunner(), new Function<TaskRunner, Response>() { // from class: io.druid.indexing.overlord.http.OverlordResource.10
            public Response apply(TaskRunner taskRunner) {
                return Response.ok(Lists.transform(Lists.newArrayList((Iterable) function.apply(taskRunner)), new Function<TaskRunnerWorkItem, TaskResponseObject>() { // from class: io.druid.indexing.overlord.http.OverlordResource.10.1
                    public TaskResponseObject apply(TaskRunnerWorkItem taskRunnerWorkItem) {
                        return new TaskResponseObject(taskRunnerWorkItem.getTaskId(), taskRunnerWorkItem.getCreatedTime(), taskRunnerWorkItem.getQueueInsertionTime(), Optional.absent(), taskRunnerWorkItem.getLocation());
                    }
                })).build();
            }
        });
    }

    private <T> Response optionalTaskResponse(String str, String str2, Optional<T> optional) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("task", str);
        if (!optional.isPresent()) {
            return Response.status(Response.Status.NOT_FOUND).entity(newHashMap).build();
        }
        newHashMap.put(str2, optional.get());
        return Response.status(Response.Status.OK).entity(newHashMap).build();
    }

    private <T> Response asLeaderWith(Optional<T> optional, Function<T, Response> function) {
        return optional.isPresent() ? (Response) function.apply(optional.get()) : Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<? extends TaskRunnerWorkItem> securedTaskRunnerWorkItem(Collection<? extends TaskRunnerWorkItem> collection, HttpServletRequest httpServletRequest) {
        final HashMap hashMap = new HashMap();
        final AuthorizationInfo authorizationInfo = (AuthorizationInfo) httpServletRequest.getAttribute("Druid-Auth-Token");
        return Collections2.filter(collection, new Predicate<TaskRunnerWorkItem>() { // from class: io.druid.indexing.overlord.http.OverlordResource.11
            public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem) {
                String taskId = taskRunnerWorkItem.getTaskId();
                Optional<Task> task = OverlordResource.this.taskStorageQueryAdapter.getTask(taskId);
                if (!task.isPresent()) {
                    throw new WebApplicationException(Response.serverError().entity(String.format("No task information found for task with id: [%s]", taskId)).build());
                }
                Pair pair = new Pair(new Resource(((Task) task.get()).getDataSource(), ResourceType.DATASOURCE), Action.READ);
                if (hashMap.containsKey(pair)) {
                    return ((Access) hashMap.get(pair)).isAllowed();
                }
                Access isAuthorized = authorizationInfo.isAuthorized((Resource) pair.lhs, (Action) pair.rhs);
                hashMap.put(pair, isAuthorized);
                return isAuthorized.isAllowed();
            }
        });
    }
}
