package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/integration/StartAndStopLatchTest.class */
public class StartAndStopLatchTest {
    private final AtomicBoolean completed = new AtomicBoolean();
    private Time clock;
    private StartAndStopLatch latch;
    private List<StartAndStopLatch> dependents;
    private ExecutorService waiters;
    private Future<Boolean> future;

    @BeforeEach
    public void setup() {
        this.clock = new MockTime();
        this.waiters = Executors.newSingleThreadExecutor();
    }

    @AfterEach
    public void teardown() {
        if (this.waiters != null) {
            this.waiters.shutdownNow();
        }
    }

    @Test
    public void shouldReturnFalseWhenAwaitingForStartToNeverComplete() throws Throwable {
        this.latch = new StartAndStopLatch(1, 1, this::complete, this.dependents, this.clock);
        this.future = asyncAwait(100L);
        this.clock.sleep(10L);
        Assertions.assertFalse(this.future.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(this.future.isDone());
    }

    @Test
    public void shouldReturnFalseWhenAwaitingForStopToNeverComplete() throws Throwable {
        this.latch = new StartAndStopLatch(1, 1, this::complete, this.dependents, this.clock);
        this.future = asyncAwait(100L);
        this.latch.recordStart();
        this.clock.sleep(10L);
        Assertions.assertFalse(this.future.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(this.future.isDone());
    }

    @Test
    public void shouldReturnTrueWhenAwaitingForStartAndStopToComplete() throws Throwable {
        this.latch = new StartAndStopLatch(1, 1, this::complete, this.dependents, this.clock);
        this.future = asyncAwait(100L);
        this.latch.recordStart();
        this.latch.recordStop();
        this.clock.sleep(10L);
        Assertions.assertTrue(this.future.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(this.future.isDone());
    }

    @Test
    public void shouldReturnFalseWhenAwaitingForDependentLatchToComplete() throws Throwable {
        this.dependents = Collections.singletonList(new StartAndStopLatch(1, 1, this::complete, null, this.clock));
        this.latch = new StartAndStopLatch(1, 1, this::complete, this.dependents, this.clock);
        this.future = asyncAwait(100L);
        this.latch.recordStart();
        this.latch.recordStop();
        this.clock.sleep(10L);
        Assertions.assertFalse(this.future.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(this.future.isDone());
    }

    @Test
    public void shouldReturnTrueWhenAwaitingForStartAndStopAndDependentLatch() throws Throwable {
        StartAndStopLatch startAndStopLatch = new StartAndStopLatch(1, 1, this::complete, null, this.clock);
        this.dependents = Collections.singletonList(startAndStopLatch);
        this.latch = new StartAndStopLatch(1, 1, this::complete, this.dependents, this.clock);
        this.future = asyncAwait(100L);
        this.latch.recordStart();
        this.latch.recordStop();
        startAndStopLatch.recordStart();
        startAndStopLatch.recordStop();
        this.clock.sleep(10L);
        Assertions.assertTrue(this.future.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(this.future.isDone());
    }

    private Future<Boolean> asyncAwait(long j) {
        return asyncAwait(j, TimeUnit.MILLISECONDS);
    }

    private Future<Boolean> asyncAwait(long j, TimeUnit timeUnit) {
        return this.waiters.submit(() -> {
            try {
                return Boolean.valueOf(this.latch.await(j, timeUnit));
            } catch (InterruptedException e) {
                Thread.interrupted();
                return false;
            }
        });
    }

    private void complete(StartAndStopLatch startAndStopLatch) {
        this.completed.set(true);
    }
}
