package io.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.QueryableIndexFactory;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.data.Stat;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/druid/indexing/worker/WorkerTaskMonitorTest.class */
public class WorkerTaskMonitorTest {
    private static final Joiner joiner = Joiner.on("/");
    private static final String basePath = "/test/druid";
    private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
    private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
    private static final DruidNode DUMMY_NODE = new DruidNode("dummy", "dummy", 9000);
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private WorkerCuratorCoordinator workerCuratorCoordinator;
    private WorkerTaskMonitor workerTaskMonitor;
    private Task task;
    private Worker worker;
    private ObjectMapper jsonMapper;
    private IndexMerger indexMerger;
    private IndexMergerV9 indexMergerV9;
    private IndexIO indexIO;

    public WorkerTaskMonitorTest() {
        TestUtils testUtils = new TestUtils();
        this.jsonMapper = testUtils.getTestObjectMapper();
        this.indexMerger = testUtils.getTestIndexMerger();
        this.indexMergerV9 = testUtils.getTestIndexMergerV9();
        this.indexIO = testUtils.getTestIndexIO();
    }

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy(new ExponentialBackoffRetry(1, 10)).compressionProvider(new PotentiallyGzippedCompressionProvider(false)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(basePath);
        this.worker = new Worker("worker", "localhost", 3, "0");
        this.workerCuratorCoordinator = new WorkerCuratorCoordinator(this.jsonMapper, new IndexerZkConfig(new ZkPathsConfig() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.1
            public String getBase() {
                return WorkerTaskMonitorTest.basePath;
            }
        }, (String) null, (String) null, (String) null, (String) null, (String) null), new TestRemoteTaskRunnerConfig(new Period("PT1S")), this.cf, this.worker);
        this.workerCuratorCoordinator.start();
        this.workerTaskMonitor = createTaskMonitor();
        TestTasks.registerSubtypes(this.jsonMapper);
        this.jsonMapper.registerSubtypes(new NamedType[]{new NamedType(TestRealtimeTask.class, "test_realtime")});
        this.workerTaskMonitor.start();
        this.task = TestTasks.immediateSuccess("test");
    }

    private WorkerTaskMonitor createTaskMonitor() {
        TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), (String) null, (String) null, 0, (List) null, false, (Period) null, (Period) null);
        TaskActionClientFactory taskActionClientFactory = (TaskActionClientFactory) EasyMock.createNiceMock(TaskActionClientFactory.class);
        TaskActionClient taskActionClient = (TaskActionClient) EasyMock.createNiceMock(TaskActionClient.class);
        EasyMock.expect(taskActionClientFactory.create((Task) EasyMock.anyObject())).andReturn(taskActionClient).anyTimes();
        SegmentHandoffNotifierFactory segmentHandoffNotifierFactory = (SegmentHandoffNotifierFactory) EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
        EasyMock.replay(new Object[]{taskActionClientFactory, taskActionClient, segmentHandoffNotifierFactory});
        return new WorkerTaskMonitor(this.jsonMapper, this.cf, this.workerCuratorCoordinator, new ThreadPoolTaskRunner(new TaskToolboxFactory(taskConfig, taskActionClientFactory, (ServiceEmitter) null, (DataSegmentPusher) null, (DataSegmentKiller) null, (DataSegmentMover) null, (DataSegmentArchiver) null, (DataSegmentAnnouncer) null, segmentHandoffNotifierFactory, (QueryRunnerFactoryConglomerate) null, (ExecutorService) null, (MonitorScheduler) null, new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager((QueryableIndexFactory) null, new SegmentLoaderConfig() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.2
            public List<StorageLocationConfig> getLocations() {
                return Lists.newArrayList();
            }
        }, this.jsonMapper)), this.jsonMapper, this.indexMerger, this.indexIO, (Cache) null, (CacheConfig) null, this.indexMergerV9), taskConfig, new NoopServiceEmitter(), DUMMY_NODE));
    }

    @After
    public void tearDown() throws Exception {
        this.workerCuratorCoordinator.stop();
        this.workerTaskMonitor.stop();
        this.cf.close();
        this.testingCluster.stop();
    }

    @Test(timeout = 30000)
    public void testRunTask() throws Exception {
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.3
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                try {
                    return WorkerTaskMonitorTest.this.cf.checkExists().forPath(WorkerTaskMonitorTest.joiner.join(WorkerTaskMonitorTest.tasksPath, WorkerTaskMonitorTest.this.task.getId(), new Object[0])) == null;
                } catch (Exception e) {
                    return false;
                }
            }
        }));
        this.cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes(this.task));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.4
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                try {
                    return ((TaskAnnouncement) WorkerTaskMonitorTest.this.jsonMapper.readValue((byte[]) WorkerTaskMonitorTest.this.cf.getData().forPath(WorkerTaskMonitorTest.joiner.join(WorkerTaskMonitorTest.statusPath, WorkerTaskMonitorTest.this.task.getId(), new Object[0])), TaskAnnouncement.class)).getTaskStatus().isComplete();
                } catch (Exception e) {
                    return false;
                }
            }
        }));
        TaskAnnouncement taskAnnouncement = (TaskAnnouncement) this.jsonMapper.readValue((byte[]) this.cf.getData().forPath(joiner.join(statusPath, this.task.getId(), new Object[0])), TaskAnnouncement.class);
        Assert.assertEquals(this.task.getId(), taskAnnouncement.getTaskStatus().getId());
        Assert.assertEquals(TaskStatus.Status.SUCCESS, taskAnnouncement.getTaskStatus().getStatusCode());
    }

    @Test(timeout = 30000)
    public void testGetAnnouncements() throws Exception {
        this.cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes(this.task));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.5
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                try {
                    return ((TaskAnnouncement) WorkerTaskMonitorTest.this.jsonMapper.readValue((byte[]) WorkerTaskMonitorTest.this.cf.getData().forPath(WorkerTaskMonitorTest.joiner.join(WorkerTaskMonitorTest.statusPath, WorkerTaskMonitorTest.this.task.getId(), new Object[0])), TaskAnnouncement.class)).getTaskStatus().isComplete();
                } catch (Exception e) {
                    return false;
                }
            }
        }));
        List announcements = this.workerCuratorCoordinator.getAnnouncements();
        Assert.assertEquals(1L, announcements.size());
        Assert.assertEquals(this.task.getId(), ((TaskAnnouncement) announcements.get(0)).getTaskStatus().getId());
        Assert.assertEquals(TaskStatus.Status.SUCCESS, ((TaskAnnouncement) announcements.get(0)).getTaskStatus().getStatusCode());
        Assert.assertEquals(DUMMY_NODE.getHost(), ((TaskAnnouncement) announcements.get(0)).getTaskLocation().getHost());
        Assert.assertEquals(DUMMY_NODE.getPort(), ((TaskAnnouncement) announcements.get(0)).getTaskLocation().getPort());
    }

    @Test(timeout = 30000)
    public void testRestartCleansOldStatus() throws Exception {
        this.task = TestTasks.unending("test");
        this.cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes(this.task));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.6
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                try {
                    return WorkerTaskMonitorTest.this.cf.checkExists().forPath(WorkerTaskMonitorTest.joiner.join(WorkerTaskMonitorTest.statusPath, WorkerTaskMonitorTest.this.task.getId(), new Object[0])) != null;
                } catch (Exception e) {
                    return false;
                }
            }
        }));
        this.workerTaskMonitor.stop();
        this.workerTaskMonitor = createTaskMonitor();
        this.workerTaskMonitor.start();
        List announcements = this.workerCuratorCoordinator.getAnnouncements();
        Assert.assertEquals(1L, announcements.size());
        Assert.assertEquals(this.task.getId(), ((TaskAnnouncement) announcements.get(0)).getTaskStatus().getId());
        Assert.assertEquals(TaskStatus.Status.FAILED, ((TaskAnnouncement) announcements.get(0)).getTaskStatus().getStatusCode());
    }

    @Test(timeout = 30000)
    public void testStatusAnnouncementsArePersistent() throws Exception {
        this.cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes(this.task));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.worker.WorkerTaskMonitorTest.7
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                try {
                    return WorkerTaskMonitorTest.this.cf.checkExists().forPath(WorkerTaskMonitorTest.joiner.join(WorkerTaskMonitorTest.statusPath, WorkerTaskMonitorTest.this.task.getId(), new Object[0])) != null;
                } catch (Exception e) {
                    return false;
                }
            }
        }));
        Assert.assertEquals(0L, ((Stat) this.cf.checkExists().forPath(joiner.join(statusPath, this.task.getId(), new Object[0]))).getEphemeralOwner());
    }
}
