package org.apache.kafka.connect.integration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/integration/StartAndStopCounterTest.class */
public class StartAndStopCounterTest {
    private StartAndStopCounter counter;
    private Time clock;
    private ExecutorService waiters;
    private StartAndStopLatch latch;

    @BeforeEach
    public void setup() {
        this.clock = new MockTime();
        this.counter = new StartAndStopCounter(this.clock);
    }

    @AfterEach
    public void teardown() {
        if (this.waiters != null) {
            try {
                this.waiters.shutdownNow();
            } finally {
                this.waiters = null;
            }
        }
    }

    @Test
    public void shouldRecordStarts() {
        Assertions.assertEquals(0, this.counter.starts());
        this.counter.recordStart();
        Assertions.assertEquals(1, this.counter.starts());
        this.counter.recordStart();
        Assertions.assertEquals(2, this.counter.starts());
        Assertions.assertEquals(2, this.counter.starts());
    }

    @Test
    public void shouldRecordStops() {
        Assertions.assertEquals(0, this.counter.stops());
        this.counter.recordStop();
        Assertions.assertEquals(1, this.counter.stops());
        this.counter.recordStop();
        Assertions.assertEquals(2, this.counter.stops());
        Assertions.assertEquals(2, this.counter.stops());
    }

    @Test
    public void shouldExpectRestarts() throws Exception {
        this.waiters = Executors.newSingleThreadExecutor();
        this.latch = this.counter.expectedRestarts(1);
        Future<Boolean> asyncAwait = asyncAwait(100L, TimeUnit.MILLISECONDS);
        this.clock.sleep(1000L);
        this.counter.recordStop();
        this.counter.recordStart();
        Assertions.assertTrue(asyncAwait.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(asyncAwait.isDone());
    }

    @Test
    public void shouldFailToWaitForRestartThatNeverHappens() throws Exception {
        this.waiters = Executors.newSingleThreadExecutor();
        this.latch = this.counter.expectedRestarts(1);
        Future<Boolean> asyncAwait = asyncAwait(100L, TimeUnit.MILLISECONDS);
        this.clock.sleep(1000L);
        this.counter.recordStop();
        Assertions.assertFalse(asyncAwait.get(200L, TimeUnit.MILLISECONDS).booleanValue());
        Assertions.assertTrue(asyncAwait.isDone());
    }

    private Future<Boolean> asyncAwait(long j, TimeUnit timeUnit) {
        return this.waiters.submit(() -> {
            try {
                return Boolean.valueOf(this.latch.await(j, timeUnit));
            } catch (InterruptedException e) {
                Thread.interrupted();
                return false;
            }
        });
    }
}
