package io.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexer.TaskLocation;
import io.druid.indexer.TaskState;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.RemoteTaskRunnerTestUtils;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/druid/indexing/overlord/RemoteTaskRunnerTest.class */
public class RemoteTaskRunnerTest {
    private static final Joiner joiner = RemoteTaskRunnerTestUtils.joiner;
    private static final String workerHost = "worker";
    private static final String announcementsPath = joiner.join(RemoteTaskRunnerTestUtils.announcementsPath, workerHost, new Object[0]);
    private static final String statusPath = joiner.join(RemoteTaskRunnerTestUtils.statusPath, workerHost, new Object[0]);
    private static final int TIMEOUT_SECONDS = 20;
    private RemoteTaskRunner remoteTaskRunner;
    private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();
    private ObjectMapper jsonMapper;
    private CuratorFramework cf;
    private Task task;
    private Worker worker;

    @Before
    public void setUp() throws Exception {
        this.rtrTestUtils.setUp();
        this.jsonMapper = this.rtrTestUtils.getObjectMapper();
        this.cf = this.rtrTestUtils.getCuratorFramework();
        this.task = TestTasks.unending("task");
    }

    @After
    public void tearDown() throws Exception {
        if (this.remoteTaskRunner != null) {
            this.remoteTaskRunner.stop();
        }
        this.rtrTestUtils.tearDown();
    }

    @Test
    public void testRun() throws Exception {
        doSetup();
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
    }

    @Test
    public void testStartWithNoWorker() throws Exception {
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
    }

    @Test
    public void testRunExistingTaskThatHasntStartedRunning() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertFalse(run.isDone());
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
    }

    @Test
    public void testRunExistingTaskThatHasStartedRunning() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertFalse(run.isDone());
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
    }

    @Test
    public void testRunTooMuchZKData() throws Exception {
        ServiceEmitter serviceEmitter = (ServiceEmitter) EasyMock.createMock(ServiceEmitter.class);
        EmittingLogger.registerEmitter(serviceEmitter);
        EasyMock.replay(new Object[]{serviceEmitter});
        doSetup();
        this.remoteTaskRunner.run(TestTasks.unending(new String(new char[5000])));
        EasyMock.verify(new Object[]{serviceEmitter});
    }

    @Test
    public void testRunSameAvailabilityGroup() throws Exception {
        doSetup();
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        this.remoteTaskRunner.run(new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), this.jsonMapper));
        this.remoteTaskRunner.run(new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), this.jsonMapper));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.1
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRunningTasks().size() == 2;
            }
        }));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.2
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getPendingTasks().size() == 1;
            }
        }));
        Assert.assertTrue(((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getPendingTasks().iterator().next()).getTaskId().equals("rt2"));
    }

    @Test
    public void testRunWithCapacity() throws Exception {
        doSetup();
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        this.remoteTaskRunner.run(new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), this.jsonMapper));
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), this.jsonMapper);
        this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.3
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRunningTasks().size() == 2;
            }
        }));
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.4
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getPendingTasks().size() == 1;
            }
        }));
        Assert.assertTrue(((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getPendingTasks().iterator().next()).getTaskId().equals("rt2"));
    }

    @Test
    public void testStatusRemoved() throws Exception {
        doSetup();
        ListenableFuture run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        Assert.assertTrue(((RemoteTaskRunnerWorkItem) this.remoteTaskRunner.getRunningTasks().iterator().next()).getTaskId().equals("task"));
        this.cf.delete().forPath(joiner.join(statusPath, this.task.getId(), new Object[0]));
        Assert.assertEquals(((TaskStatus) run.get(20L, TimeUnit.SECONDS)).getStatusCode(), TaskState.FAILED);
    }

    @Test
    public void testBootstrap() throws Exception {
        ((ACLBackgroundPathAndBytesable) this.cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(joiner.join(statusPath, "first", new Object[0]), this.jsonMapper.writeValueAsBytes(TaskStatus.running("first")));
        ((ACLBackgroundPathAndBytesable) this.cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(joiner.join(statusPath, "second", new Object[0]), this.jsonMapper.writeValueAsBytes(TaskStatus.running("second")));
        doSetup();
        HashSet newHashSet = Sets.newHashSet();
        Iterator it = this.remoteTaskRunner.getWorkers().iterator();
        while (it.hasNext()) {
            newHashSet.addAll(((ImmutableWorkerInfo) it.next()).getRunningTasks());
        }
        Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), newHashSet);
        Assert.assertEquals("runningTasks", ImmutableSet.of("first", "second"), Sets.newHashSet(Iterables.transform(this.remoteTaskRunner.getRunningTasks(), new Function<RemoteTaskRunnerWorkItem, String>() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.5
            public String apply(RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) {
                return remoteTaskRunnerWorkItem.getTaskId();
            }
        })));
    }

    @Test
    public void testRunWithTaskComplete() throws Exception {
        ((ACLBackgroundPathAndBytesable) this.cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(joiner.join(statusPath, this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes(TaskStatus.success(this.task.getId())));
        doSetup();
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) this.remoteTaskRunner.run(this.task).get(20L, TimeUnit.SECONDS)).getStatusCode());
    }

    @Test
    public void testWorkerRemoved() throws Exception {
        doSetup();
        ListenableFuture run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        this.cf.delete().forPath(announcementsPath);
        Assert.assertEquals(TaskState.FAILED, ((TaskStatus) run.get(20L, TimeUnit.SECONDS)).getStatusCode());
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.6
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
            }
        }, this.remoteTaskRunner.getRemoteTaskRunnerConfig().getTaskCleanupTimeout().toStandardDuration().getMillis() * 2));
        Assert.assertNull(this.cf.checkExists().forPath(statusPath));
    }

    @Test
    public void testWorkerDisabled() throws Exception {
        doSetup();
        ListenableFuture<TaskStatus> run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        disableWorker();
        mockWorkerCompleteSuccessfulTask(this.task);
        Assert.assertTrue(workerCompletedTask(run));
        Assert.assertEquals(this.task.getId(), ((TaskStatus) run.get()).getId());
        Assert.assertEquals(TaskState.SUCCESS, ((TaskStatus) run.get()).getStatusCode());
        Assert.assertEquals("", ((ImmutableWorkerInfo) Iterables.getOnlyElement(this.remoteTaskRunner.getWorkers())).getWorker().getVersion());
    }

    private void doSetup() throws Exception {
        makeWorker();
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5S")));
    }

    private void makeRemoteTaskRunner(RemoteTaskRunnerConfig remoteTaskRunnerConfig) throws Exception {
        this.remoteTaskRunner = this.rtrTestUtils.makeRemoteTaskRunner(remoteTaskRunnerConfig);
    }

    private void makeWorker() throws Exception {
        this.worker = this.rtrTestUtils.makeWorker(workerHost, 3);
    }

    private void disableWorker() throws Exception {
        this.rtrTestUtils.disableWorker(this.worker);
    }

    private boolean taskAnnounced(String str) {
        return this.rtrTestUtils.taskAnnounced(workerHost, str);
    }

    private boolean workerRunningTask(String str) {
        return this.rtrTestUtils.workerRunningTask(workerHost, str);
    }

    private boolean workerCompletedTask(final ListenableFuture<TaskStatus> listenableFuture) {
        return TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.7
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return listenableFuture.isDone();
            }
        });
    }

    private void mockWorkerRunningTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerRunningTask(workerHost, task);
    }

    private void mockWorkerCompleteSuccessfulTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerCompleteSuccessfulTask(workerHost, task);
    }

    private void mockWorkerCompleteFailedTask(Task task) throws Exception {
        this.rtrTestUtils.mockWorkerCompleteFailedTask(workerHost, task);
    }

    @Test
    public void testFindLazyWorkerTaskRunning() throws Exception {
        doSetup();
        this.remoteTaskRunner.start();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.8
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 1).isEmpty());
        Assert.assertTrue(this.remoteTaskRunner.getLazyWorkers().isEmpty());
        Assert.assertEquals(1L, this.remoteTaskRunner.getWorkers().size());
    }

    @Test
    public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception {
        doSetup();
        this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        Assert.assertTrue(this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.9
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 1).isEmpty());
        Assert.assertTrue(this.remoteTaskRunner.getLazyWorkers().isEmpty());
        Assert.assertEquals(1L, this.remoteTaskRunner.getWorkers().size());
    }

    @Test
    public void testFindLazyWorkerNotRunningAnyTask() throws Exception {
        doSetup();
        Assert.assertEquals(1L, this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.10
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 1).size());
        Assert.assertEquals(1L, this.remoteTaskRunner.getLazyWorkers().size());
    }

    @Test
    public void testFindLazyWorkerNotRunningAnyTaskButWithZeroMaxWorkers() throws Exception {
        doSetup();
        Assert.assertEquals(0L, this.remoteTaskRunner.markWorkersLazy(new Predicate<ImmutableWorkerInfo>() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.11
            public boolean apply(ImmutableWorkerInfo immutableWorkerInfo) {
                return true;
            }
        }, 0).size());
        Assert.assertEquals(0L, this.remoteTaskRunner.getLazyWorkers().size());
    }

    @Test
    public void testWorkerZKReconnect() throws Exception {
        makeWorker();
        makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5M")));
        ListenableFuture run = this.remoteTaskRunner.run(this.task);
        Assert.assertTrue(taskAnnounced(this.task.getId()));
        mockWorkerRunningTask(this.task);
        Assert.assertTrue(workerRunningTask(this.task.getId()));
        byte[] bArr = (byte[]) this.cf.getData().forPath(announcementsPath);
        this.cf.delete().forPath(announcementsPath);
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.12
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().containsKey(RemoteTaskRunnerTest.this.worker.getHost());
            }
        }));
        this.cf.create().forPath(announcementsPath, bArr);
        Assert.assertTrue(TestUtils.conditionValid(new IndexingServiceCondition() { // from class: io.druid.indexing.overlord.RemoteTaskRunnerTest.13
            @Override // io.druid.indexing.common.IndexingServiceCondition
            public boolean isValid() {
                return !RemoteTaskRunnerTest.this.remoteTaskRunner.getRemovedWorkerCleanups().containsKey(RemoteTaskRunnerTest.this.worker.getHost());
            }
        }));
        mockWorkerCompleteSuccessfulTask(this.task);
        TaskStatus taskStatus = (TaskStatus) run.get(20L, TimeUnit.SECONDS);
        Assert.assertEquals(taskStatus.getStatusCode(), TaskState.SUCCESS);
        Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
    }

    @Test
    public void testSortByInsertionTime() throws Exception {
        RemoteTaskRunnerWorkItem withQueueInsertionTime = new RemoteTaskRunnerWorkItem("b", (Worker) null, (TaskLocation) null).withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z"));
        RemoteTaskRunnerWorkItem withQueueInsertionTime2 = new RemoteTaskRunnerWorkItem("a", (Worker) null, (TaskLocation) null).withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z"));
        RemoteTaskRunnerWorkItem withQueueInsertionTime3 = new RemoteTaskRunnerWorkItem("c", (Worker) null, (TaskLocation) null).withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z"));
        ArrayList newArrayList = Lists.newArrayList(new RemoteTaskRunnerWorkItem[]{withQueueInsertionTime, withQueueInsertionTime2, withQueueInsertionTime3});
        RemoteTaskRunner.sortByInsertionTime(newArrayList);
        Assert.assertEquals(withQueueInsertionTime3, newArrayList.get(0));
        Assert.assertEquals(withQueueInsertionTime2, newArrayList.get(1));
        Assert.assertEquals(withQueueInsertionTime, newArrayList.get(2));
    }

    @Test
    public void testBlacklistZKWorkers() throws Exception {
        Period millis = Period.millis(1000);
        makeWorker();
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(millis);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(100);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), this.jsonMapper);
        ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        mockWorkerCompleteFailedTask(testRealtimeTask);
        Assert.assertTrue(((TaskStatus) run.get(20L, TimeUnit.SECONDS)).isFailure());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(1L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask.getId()).getContinuouslyFailedTasksCount());
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.running("realtime2"), this.jsonMapper);
        ListenableFuture run2 = this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        mockWorkerCompleteFailedTask(testRealtimeTask2);
        Assert.assertTrue(((TaskStatus) run2.get(20L, TimeUnit.SECONDS)).isFailure());
        Assert.assertEquals(1L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(2L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask2.getId()).getContinuouslyFailedTasksCount());
        ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) this.remoteTaskRunner).setCurrentTimeMillis(System.currentTimeMillis());
        this.remoteTaskRunner.checkBlackListedNodes();
        Assert.assertEquals(1L, this.remoteTaskRunner.getBlackListedWorkers().size());
        ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) this.remoteTaskRunner).setCurrentTimeMillis(System.currentTimeMillis() + (2 * millis.toStandardDuration().getMillis()));
        this.remoteTaskRunner.checkBlackListedNodes();
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(0L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask2.getId()).getContinuouslyFailedTasksCount());
        TestRealtimeTask testRealtimeTask3 = new TestRealtimeTask("realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.running("realtime3"), this.jsonMapper);
        ListenableFuture run3 = this.remoteTaskRunner.run(testRealtimeTask3);
        Assert.assertTrue(taskAnnounced(testRealtimeTask3.getId()));
        mockWorkerRunningTask(testRealtimeTask3);
        mockWorkerCompleteSuccessfulTask(testRealtimeTask3);
        Assert.assertTrue(((TaskStatus) run3.get(20L, TimeUnit.SECONDS)).isSuccess());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        Assert.assertEquals(0L, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask3.getId()).getContinuouslyFailedTasksCount());
    }

    @Test
    public void testBlacklistZKWorkers25Percent() throws Exception {
        Period millis = Period.millis(1000);
        this.rtrTestUtils.makeWorker(workerHost, 10);
        this.rtrTestUtils.makeWorker("worker2", 10);
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(millis);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(25);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        String str = null;
        String str2 = null;
        for (int i = 1; i < 13; i++) {
            String format = StringUtils.format("rt-%d", new Object[]{Integer.valueOf(i)});
            Task testRealtimeTask = new TestRealtimeTask(format, new TaskResource(format, 1), "foo", TaskStatus.success(format), this.jsonMapper);
            ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
            if (i == 1) {
                if (this.rtrTestUtils.taskAnnounced("worker2", testRealtimeTask.getId())) {
                    str = "worker2";
                    str2 = workerHost;
                } else {
                    str = workerHost;
                    str2 = "worker2";
                }
            }
            String str3 = i % 2 == 0 ? str2 : str;
            Assert.assertTrue(this.rtrTestUtils.taskAnnounced(str3, testRealtimeTask.getId()));
            this.rtrTestUtils.mockWorkerRunningTask(str3, testRealtimeTask);
            this.rtrTestUtils.mockWorkerCompleteFailedTask(str3, testRealtimeTask);
            Assert.assertTrue(((TaskStatus) run.get(20L, TimeUnit.SECONDS)).isFailure());
            Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
            Assert.assertEquals((i + 1) / 2, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask.getId()).getContinuouslyFailedTasksCount());
        }
    }

    @Test
    public void testBlacklistZKWorkers50Percent() throws Exception {
        Period millis = Period.millis(1000);
        this.rtrTestUtils.makeWorker(workerHost, 10);
        this.rtrTestUtils.makeWorker("worker2", 10);
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(millis);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(50);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        String str = null;
        String str2 = null;
        int i = 1;
        while (i < 13) {
            String format = StringUtils.format("rt-%d", new Object[]{Integer.valueOf(i)});
            Task testRealtimeTask = new TestRealtimeTask(format, new TaskResource(format, 1), "foo", TaskStatus.success(format), this.jsonMapper);
            ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
            if (i == 1) {
                if (this.rtrTestUtils.taskAnnounced("worker2", testRealtimeTask.getId())) {
                    str = "worker2";
                    str2 = workerHost;
                } else {
                    str = workerHost;
                    str2 = "worker2";
                }
            }
            String str3 = (i % 2 == 0 || i > 4) ? str2 : str;
            Assert.assertTrue(this.rtrTestUtils.taskAnnounced(str3, testRealtimeTask.getId()));
            this.rtrTestUtils.mockWorkerRunningTask(str3, testRealtimeTask);
            this.rtrTestUtils.mockWorkerCompleteFailedTask(str3, testRealtimeTask);
            Assert.assertTrue(((TaskStatus) run.get(20L, TimeUnit.SECONDS)).isFailure());
            Assert.assertEquals(i > 2 ? 1L : 0L, this.remoteTaskRunner.getBlackListedWorkers().size());
            Assert.assertEquals(i > 4 ? i - 2 : (i + 1) / 2, this.remoteTaskRunner.findWorkerRunningTask(testRealtimeTask.getId()).getContinuouslyFailedTasksCount());
            i++;
        }
    }

    @Test
    public void testSuccessfulTaskOnBlacklistedWorker() throws Exception {
        Period millis = Period.millis(1000);
        makeWorker();
        TestRemoteTaskRunnerConfig testRemoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(millis);
        testRemoteTaskRunnerConfig.setMaxPercentageBlacklistWorkers(100);
        makeRemoteTaskRunner(testRemoteTaskRunnerConfig);
        TestRealtimeTask testRealtimeTask = new TestRealtimeTask("realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), this.jsonMapper);
        TestRealtimeTask testRealtimeTask2 = new TestRealtimeTask("realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success("realtime2"), this.jsonMapper);
        TestRealtimeTask testRealtimeTask3 = new TestRealtimeTask("realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success("realtime3"), this.jsonMapper);
        ListenableFuture run = this.remoteTaskRunner.run(testRealtimeTask);
        Assert.assertTrue(taskAnnounced(testRealtimeTask.getId()));
        mockWorkerRunningTask(testRealtimeTask);
        mockWorkerCompleteFailedTask(testRealtimeTask);
        Assert.assertTrue(((TaskStatus) run.get(20L, TimeUnit.SECONDS)).isFailure());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
        ListenableFuture run2 = this.remoteTaskRunner.run(testRealtimeTask2);
        Assert.assertTrue(taskAnnounced(testRealtimeTask2.getId()));
        mockWorkerRunningTask(testRealtimeTask2);
        ListenableFuture run3 = this.remoteTaskRunner.run(testRealtimeTask3);
        Assert.assertTrue(taskAnnounced(testRealtimeTask3.getId()));
        mockWorkerRunningTask(testRealtimeTask3);
        mockWorkerCompleteFailedTask(testRealtimeTask3);
        Assert.assertTrue(((TaskStatus) run3.get(20L, TimeUnit.SECONDS)).isFailure());
        Assert.assertEquals(1L, this.remoteTaskRunner.getBlackListedWorkers().size());
        mockWorkerCompleteSuccessfulTask(testRealtimeTask2);
        Assert.assertTrue(((TaskStatus) run2.get(20L, TimeUnit.SECONDS)).isSuccess());
        Assert.assertEquals(0L, this.remoteTaskRunner.getBlackListedWorkers().size());
    }
}
