package org.apache.iotdb.confignode.procedure;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.confignode.procedure.entity.IncProcedure;
import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/iotdb/confignode/procedure/TestProcedureExecutor.class */
public class TestProcedureExecutor extends TestProcedureBase {
    @Override // org.apache.iotdb.confignode.procedure.TestProcedureBase
    protected void initExecutor() {
        this.env = new TestProcEnv();
        this.procStore = new NoopProcedureStore();
        this.procExecutor = new ProcedureExecutor<>(this.env, this.procStore);
        this.env.setScheduler(this.procExecutor.getScheduler());
        this.procExecutor.init(2);
    }

    @Test
    public void testSubmitProcedure() {
        ProcedureTestUtil.waitForProcedure(this.procExecutor, this.procExecutor.submitProcedure(new IncProcedure()));
        Assert.assertEquals(getEnv().getAcc().get(), 1L);
    }

    @Test
    public void testWorkerThreadStuck() throws InterruptedException {
        Semaphore semaphore = new Semaphore(2);
        semaphore.acquire(2);
        StuckProcedure stuckProcedure = new StuckProcedure(semaphore);
        Semaphore semaphore2 = new Semaphore(2);
        semaphore2.acquire(2);
        StuckProcedure stuckProcedure2 = new StuckProcedure(semaphore2);
        long submitProcedure = this.procExecutor.submitProcedure(stuckProcedure);
        long submitProcedure2 = this.procExecutor.submitProcedure(stuckProcedure2);
        long submitProcedure3 = this.procExecutor.submitProcedure(new NoopProcedure());
        int waitThreadCount = waitThreadCount(3);
        LOG.info("new threads got created: " + (waitThreadCount - 2));
        Assert.assertEquals(3L, waitThreadCount);
        ProcedureTestUtil.waitForProcedure(this.procExecutor, submitProcedure3);
        Assert.assertEquals(true, Boolean.valueOf(this.procExecutor.isFinished(submitProcedure3)));
        Assert.assertEquals(true, Boolean.valueOf(this.procExecutor.isRunning()));
        Assert.assertEquals(false, Boolean.valueOf(this.procExecutor.isFinished(submitProcedure)));
        Assert.assertEquals(false, Boolean.valueOf(this.procExecutor.isFinished(submitProcedure2)));
        semaphore.release();
        semaphore2.release();
        LOG.info("set keep alive and wait threads being removed");
        int waitThreadCount2 = waitThreadCount(2);
        LOG.info("threads got removed: " + (waitThreadCount - waitThreadCount2));
        Assert.assertEquals(2L, waitThreadCount2);
        semaphore.release();
        semaphore2.release();
        ProcedureTestUtil.waitForProcedure(this.procExecutor, submitProcedure);
        ProcedureTestUtil.waitForProcedure(this.procExecutor, submitProcedure2);
    }

    private int waitThreadCount(int i) {
        long currentTimeMillis = System.currentTimeMillis();
        while (this.procExecutor.isRunning() && TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTimeMillis) <= 30 && this.procExecutor.getWorkerThreadCount() != i) {
            ProcedureTestUtil.sleepWithoutInterrupt(250L);
        }
        return this.procExecutor.getWorkerThreadCount();
    }
}
