package org.apache.kafka.connect.runtime.distributed;

import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.crypto.SecretKey;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.CloseableConnectorContext;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.ErrorHandlingTaskTest;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.RestartPlan;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TooManyTasksException;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.Stage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.class */
public class DistributedHerderTest {
    private static final Map<String, String> HERDER_CONFIG = new HashMap();
    private static final String MEMBER_URL = "memberUrl";
    private static final String CONN1 = "sourceA";
    private static final String CONN2 = "sourceB";
    private static final ConnectorTaskId TASK0;
    private static final ConnectorTaskId TASK1;
    private static final ConnectorTaskId TASK2;
    private static final Integer MAX_TASKS;
    private static final Map<String, String> CONN1_CONFIG;
    private static final String FOO_TOPIC = "foo";
    private static final String BAR_TOPIC = "bar";
    private static final String BAZ_TOPIC = "baz";
    private static final Map<String, String> CONN1_CONFIG_UPDATED;
    private static final ConfigInfos CONN1_CONFIG_INFOS;
    private static final Map<String, String> CONN2_CONFIG;
    private static final ConfigInfos CONN2_CONFIG_INFOS;
    private static final ConfigInfos CONN2_INVALID_CONFIG_INFOS;
    private static final Map<String, String> TASK_CONFIG;
    private static final List<Map<String, String>> TASK_CONFIGS;
    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP;
    private static final ClusterConfigState SNAPSHOT;
    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1;
    private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1;
    private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1_FENCED;
    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG;
    private static final String WORKER_ID = "localhost:8083";
    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
    private static final Runnable EMPTY_RUNNABLE;

    @Mock
    private ConfigBackingStore configBackingStore;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private WorkerGroupMember member;
    private MockTime time;
    private DistributedHerder herder;
    private MockConnectMetrics metrics;

    @Mock
    private Worker worker;

    @Mock
    private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;

    @Mock
    private Plugins plugins;

    @Mock
    private RestClient restClient;
    private ConfigBackingStore.UpdateListener configUpdateListener;
    private WorkerRebalanceListener rebalanceListener;
    private ExecutorService herderExecutor;
    private Future<?> herderFuture;
    private SinkConnectorConfig conn1SinkConfig;
    private SinkConnectorConfig conn1SinkConfigUpdated;
    private short connectProtocolVersion;
    private final CountDownLatch shutdownCalled = new CountDownLatch(1);
    private final SampleConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy();

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceConnector.class */
    private static abstract class BogusSourceConnector extends SourceConnector {
        private BogusSourceConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceTask.class */
    private static abstract class BogusSourceTask extends SourceTask {
        private BogusSourceTask() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$MockSynchronousExecutor.class */
    private static class MockSynchronousExecutor extends AbstractExecutorService {
        private MockSynchronousExecutor() {
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            runnable.run();
        }

        @Override // java.util.concurrent.ExecutorService
        public void shutdown() {
        }

        @Override // java.util.concurrent.ExecutorService
        public List<Runnable> shutdownNow() {
            return null;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isShutdown() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isTerminated() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean awaitTermination(long j, TimeUnit timeUnit) {
            return false;
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.time = new MockTime();
        this.metrics = new MockConnectMetrics(this.time);
        CountDownLatch countDownLatch = this.shutdownCalled;
        Objects.requireNonNull(countDownLatch);
        AutoCloseable autoCloseable = countDownLatch::countDown;
        this.connectProtocolVersion = (short) 0;
        this.herder = (DistributedHerder) Mockito.mock(DistributedHerder.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS).useConstructor(new Object[]{new DistributedConfig(HERDER_CONFIG), this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, this.configBackingStore, this.member, MEMBER_URL, this.restClient, this.metrics, this.time, this.noneConnectorClientConfigOverridePolicy, Collections.emptyList(), null, new AutoCloseable[]{autoCloseable}}));
        DistributedHerder distributedHerder = this.herder;
        Objects.requireNonNull(distributedHerder);
        this.configUpdateListener = new DistributedHerder.ConfigUpdateListener(distributedHerder);
        DistributedHerder distributedHerder2 = this.herder;
        Objects.requireNonNull(distributedHerder2);
        this.rebalanceListener = new DistributedHerder.RebalanceListener(distributedHerder2, this.time);
        this.conn1SinkConfig = new SinkConnectorConfig(this.plugins, CONN1_CONFIG);
        this.conn1SinkConfigUpdated = new SinkConnectorConfig(this.plugins, CONN1_CONFIG_UPDATED);
    }

    @AfterEach
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
        if (this.herderExecutor != null) {
            this.herderExecutor.shutdownNow();
            this.herderExecutor = null;
        }
    }

    @Test
    public void testJoinAssignment() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        expectMemberPoll();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.configBackingStore, this.statusBackingStore, this.worker});
    }

    @Test
    public void testRebalance() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        expectMemberPoll();
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        ((Worker) Mockito.verify(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker)).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.eq(this.conn1SinkConfig));
        ((Worker) Mockito.verify(this.worker)).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        expectRebalance((Collection<String>) Collections.singletonList(CONN1), Collections.singletonList(TASK1), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList());
        this.herder.tick();
        this.time.sleep(3000L);
        assertStatistics(3, 2, 100.0d, 3000.0d);
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.eq(this.conn1SinkConfig));
        ((Worker) Mockito.verify(this.worker)).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.configBackingStore, this.statusBackingStore, this.worker});
    }

    @Test
    public void testIncrementalCooperativeRebalanceForNewMember() {
        this.connectProtocolVersion = (short) 1;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 1);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1), 0);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        this.time.sleep(3000L);
        assertStatistics(3, 2, 100.0d, 3000.0d);
        ((Worker) Mockito.verify(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker)).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.eq(this.conn1SinkConfig));
        ((Worker) Mockito.verify(this.worker)).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.statusBackingStore, this.configBackingStore, this.worker});
    }

    @Test
    public void testIncrementalCooperativeRebalanceForExistingMember() {
        this.connectProtocolVersion = (short) 1;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 1);
        expectRebalance(Collections.singletonList(CONN1), Collections.singletonList(TASK1), (short) 0, 1L, Collections.emptyList(), Collections.emptyList(), 0);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        expectMemberPoll();
        this.herder.configState = SNAPSHOT;
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        this.time.sleep(3000L);
        assertStatistics(3, 2, 100.0d, 3000.0d);
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.statusBackingStore, this.configBackingStore, this.worker});
    }

    @Test
    public void testIncrementalCooperativeRebalanceWithDelay() {
        this.connectProtocolVersion = (short) 1;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 1);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.emptyList(), Collections.singletonList(TASK2), 10000);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK2), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        ((WorkerGroupMember) Mockito.doAnswer(invocationOnMock -> {
            this.time.sleep(9900L);
            return null;
        }).when(this.member)).poll(ArgumentMatchers.anyLong(), (Supplier) ArgumentMatchers.any());
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        this.time.sleep(1000L);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1), 0);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock3 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        expectMemberPoll();
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 2, 100.0d, 2000.0d);
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.statusBackingStore, this.configBackingStore, this.worker});
    }

    @Test
    public void testRebalanceFailedConnector() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        expectMemberPoll();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        ((Worker) Mockito.verify(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker)).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.eq(this.conn1SinkConfig));
        ((Worker) Mockito.verify(this.worker)).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        expectRebalance((Collection<String>) Collections.singletonList(CONN1), Collections.singletonList(TASK1), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectExecuteTaskReconfiguration(false, null, null);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 2, 100.0d, 2000.0d);
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker)).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.eq(this.conn1SinkConfig));
        ((Worker) Mockito.verify(this.worker)).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.statusBackingStore, this.configBackingStore, this.worker});
    }

    @Test
    public void testRevoke() {
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        revokeAndReassign(false);
    }

    @Test
    public void testIncompleteRebalanceBeforeRevoke() {
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        revokeAndReassign(true);
    }

    public void revokeAndReassign(boolean z) {
        this.connectProtocolVersion = (short) 1;
        int i = 1;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn(Short.valueOf(this.connectProtocolVersion));
        expectRebalance(1, new ArrayList(Collections.singletonList(CONN1)), new ArrayList(Collections.singletonList(TASK1)));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        expectMemberPoll();
        this.herder.tick();
        if (z) {
            i = 1 + 1;
            expectRebalance(i, Collections.emptyList(), Collections.emptyList());
            expectConfigRefreshAndSnapshot(SNAPSHOT);
            ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
            this.herder.tick();
        }
        expectRebalance((Collection<String>) Collections.singletonList(CONN1), Collections.emptyList(), (short) 0, i, Collections.emptyList(), Collections.emptyList());
        if (z) {
            expectConfigRefreshAndSnapshot(new ClusterConfigState(i, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()));
        }
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        this.herder.tick();
        expectRebalance(i, Collections.singletonList(CONN1), Collections.emptyList());
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.eq(this.conn1SinkConfig));
        ((Worker) Mockito.verify(this.worker)).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED));
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitConnector(CONN1);
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(2))).poll(ArgumentMatchers.anyLong(), (Supplier) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.statusBackingStore, this.configBackingStore, this.worker});
    }

    @Test
    public void testHaltCleansUpWorker() {
        this.herder.halt();
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitConnectors();
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitTasks();
        ((WorkerGroupMember) Mockito.verify(this.member)).stop();
        ((ConfigBackingStore) Mockito.verify(this.configBackingStore)).stop();
        ((StatusBackingStore) Mockito.verify(this.statusBackingStore)).stop();
        ((Worker) Mockito.verify(this.worker)).stop();
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testCreateConnector() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, CONN2_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(CONN2_CONFIG), (Callback) forClass.capture());
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putConnectorConfig((String) ArgumentMatchers.eq(CONN2), (Map) ArgumentMatchers.eq(CONN2_CONFIG), (TargetState) ArgumentMatchers.isNull());
        expectMemberEnsureActive();
        List<String> expectRecordStages = expectRecordStages(this.putConnectorCallback);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        ((Callback) Mockito.verify(this.putConnectorCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (Herder.Created) ArgumentMatchers.eq(new Herder.Created(true, new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList(), ConnectorType.SOURCE))));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, this.putConnectorCallback});
        Assertions.assertEquals(Arrays.asList("ensuring membership in the cluster", "writing a config for connector sourceB to the config topic"), expectRecordStages);
    }

    @Test
    public void testCreateConnectorWithInitialState() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, CONN2_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(CONN2_CONFIG), (Callback) forClass.capture());
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putConnectorConfig((String) ArgumentMatchers.eq(CONN2), (Map) ArgumentMatchers.eq(CONN2_CONFIG), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED));
        expectMemberEnsureActive();
        List<String> expectRecordStages = expectRecordStages(this.putConnectorCallback);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, TargetState.STOPPED, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        ((Callback) Mockito.verify(this.putConnectorCallback)).onCompletion((Throwable) ArgumentMatchers.isNull(), (Herder.Created) ArgumentMatchers.eq(new Herder.Created(true, new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList(), ConnectorType.SOURCE))));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, this.putConnectorCallback});
        Assertions.assertEquals(Arrays.asList("ensuring membership in the cluster", "writing a config for connector sourceB to the config topic"), expectRecordStages);
    }

    @Test
    public void testCreateConnectorConfigBackingStoreError() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, CONN2_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(CONN2_CONFIG), (Callback) forClass.capture());
        ((ConfigBackingStore) Mockito.doThrow(new Throwable[]{new ConnectException("Error writing connector configuration to Kafka")}).when(this.configBackingStore)).putConnectorConfig((String) ArgumentMatchers.eq(CONN2), (Map) ArgumentMatchers.eq(CONN2_CONFIG), (TargetState) ArgumentMatchers.isNull());
        expectMemberEnsureActive();
        List<String> expectRecordStages = expectRecordStages(this.putConnectorCallback);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        ((Callback) Mockito.verify(this.putConnectorCallback)).onCompletion((Throwable) ArgumentMatchers.any(ConnectException.class), (Herder.Created) ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, this.putConnectorCallback});
        Assertions.assertEquals(Arrays.asList("ensuring membership in the cluster", "writing a config for connector sourceB to the config topic"), expectRecordStages);
    }

    @Test
    public void testCreateConnectorFailedValidation() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.remove("name");
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, CONN2_INVALID_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(hashMap), (Callback) forClass.capture());
        List<String> expectRecordStages = expectRecordStages(this.putConnectorCallback);
        this.herder.putConnectorConfig(CONN2, hashMap, false, this.putConnectorCallback);
        this.herder.tick();
        expectMemberEnsureActive();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Throwable.class);
        ((Callback) Mockito.verify(this.putConnectorCallback)).onCompletion((Throwable) forClass2.capture(), (Herder.Created) ArgumentMatchers.isNull());
        Assertions.assertInstanceOf(BadRequestException.class, forClass2.getValue());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, this.putConnectorCallback});
        Assertions.assertEquals(Arrays.asList("awaiting startup", "ensuring membership in the cluster", "reading to the end of the config topic"), expectRecordStages);
    }

    @Test
    public void testConnectorNameConflictsWithWorkerGroupId() {
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.put("name", "test-group");
        Assertions.assertEquals(Collections.singletonList("Consumer group for sink connector named test-group conflicts with Connect worker group connect-test-group"), ((ConfigValue) this.herder.validateSinkConnectorConfig((SinkConnector) Mockito.mock(SinkConnector.class), SinkConnectorConfig.configDef(), hashMap).get("name")).errorMessages());
    }

    @Test
    public void testConnectorGroupIdConflictsWithWorkerGroupId() {
        HashMap hashMap = new HashMap(CONN2_CONFIG);
        hashMap.put("consumer.override.group.id", "connect-test-group");
        Map validateSinkConnectorConfig = this.herder.validateSinkConnectorConfig((SinkConnector) Mockito.mock(SinkConnector.class), SinkConnectorConfig.configDef(), hashMap);
        Assertions.assertEquals(Collections.singletonList("Consumer group connect-test-group conflicts with Connect worker group connect-test-group"), ((ConfigValue) validateSinkConnectorConfig.get("consumer.override.group.id")).errorMessages());
        Assertions.assertEquals(Collections.emptyList(), ((ConfigValue) validateSinkConnectorConfig.get("name")).errorMessages());
    }

    @Test
    public void testCreateConnectorAlreadyExists() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, CONN1_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(CONN1_CONFIG), (Callback) forClass.capture());
        List<String> expectRecordStages = expectRecordStages(this.putConnectorCallback);
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        expectMemberEnsureActive();
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        ((Callback) Mockito.verify(this.putConnectorCallback)).onCompletion((Throwable) ArgumentMatchers.any(AlreadyExistsException.class), (Herder.Created) ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, this.putConnectorCallback});
        Assertions.assertEquals(Arrays.asList("awaiting startup", "ensuring membership in the cluster", "reading to the end of the config topic"), expectRecordStages);
    }

    @Test
    public void testDestroyConnector() {
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).removeConnectorConfig(CONN1);
        ((Callback) Mockito.doNothing().when(this.putConnectorCallback)).onCompletion((Throwable) null, new Herder.Created(false, (Object) null));
        List<String> expectRecordStages = expectRecordStages(this.putConnectorCallback);
        this.herder.deleteConnectorConfig(CONN1, this.putConnectorCallback);
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics("leaderUrl", false, 3, 1, 100.0d, 1000.0d);
        Mockito.when(this.statusBackingStore.getAllTopics((String) ArgumentMatchers.eq(CONN1))).thenReturn(new HashSet(Arrays.asList(new TopicStatus(FOO_TOPIC, CONN1, 0, this.time.milliseconds()), new TopicStatus(BAR_TOPIC, CONN1, 0, this.time.milliseconds()))));
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).deleteTopic((String) ArgumentMatchers.eq(CONN1), (String) ArgumentMatchers.eq(FOO_TOPIC));
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).deleteTopic((String) ArgumentMatchers.eq(CONN1), (String) ArgumentMatchers.eq(BAR_TOPIC));
        expectRebalance(Collections.singletonList(CONN1), Collections.singletonList(TASK1), (short) 0, 2L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        expectConfigRefreshAndSnapshot(ClusterConfigState.EMPTY);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        this.configUpdateListener.onConnectorConfigRemove(CONN1);
        this.herder.configState = ClusterConfigState.EMPTY;
        this.herder.tick();
        this.time.sleep(1000L);
        assertStatistics("leaderUrl", true, 3, 1, 100.0d, 2100.0d);
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, this.putConnectorCallback});
        Assertions.assertEquals(Arrays.asList("awaiting startup", "ensuring membership in the cluster", "reading to the end of the config topic", "starting 1 connector(s) and task(s) after a rebalance", "removing the config for connector sourceA from the config topic"), expectRecordStages);
    }

    @Test
    public void testRestartConnector() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        expectMemberEnsureActive();
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).connectorTaskConfigs((String) ArgumentMatchers.eq(CONN1), (ConnectorConfig) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitConnector(CONN1);
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartUnknownConnector() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN2, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(NotFoundException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause().getClass());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartConnectorRedirectToLeader() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(NotLeaderException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause().getClass());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartConnectorRedirectToOwner() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.member.ownerUrl(CONN1)).thenReturn("ownerUrl");
        this.time.sleep(1000L);
        assertStatistics(3, 1, 100.0d, 1000.0d);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(CONN1, futureCallback);
        this.herder.tick();
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 3000.0d);
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertEquals(NotAssignedException.class, executionException.getCause().getClass());
        Assertions.assertEquals("ownerUrl", executionException.getCause().forwardUrl());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartConnectorAndTasksUnknownConnector() {
        RestartRequest restartRequest = new RestartRequest("UnknownConnector", false, true);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertInstanceOf(NotFoundException.class, executionException.getCause());
        Assertions.assertTrue(executionException.getMessage().contains("Unknown connector:"));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartConnectorAndTasksNotLeader() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(new RestartRequest(CONN1, false, true), futureCallback);
        this.herder.tick();
        Assertions.assertInstanceOf(NotLeaderException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartConnectorAndTasksUnknownStatus() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.statusBackingStore.get(CONN1)).thenReturn((Object) null);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putRestartRequest(restartRequest);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertInstanceOf(NotFoundException.class, executionException.getCause());
        Assertions.assertTrue(executionException.getMessage().contains("Status for connector"));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartConnectorAndTasksSuccess() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        ConnectorStateInfo connectorStateInfo = (ConnectorStateInfo) Mockito.mock(ConnectorStateInfo.class);
        Mockito.when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        ((DistributedHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putRestartRequest(restartRequest);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnectorAndTasks(restartRequest, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(connectorStateInfo, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Mockito.verifyNoMoreInteractions(new Object[]{restartPlan, this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testDoRestartConnectorAndTasksEmptyPlan() {
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        ((DistributedHerder) Mockito.doReturn(Optional.empty()).when(this.herder)).buildRestartPlan(restartRequest);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testDoRestartConnectorAndTasksNoAssignments() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN1, 0);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(true);
        Mockito.when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(connectorTaskId));
        ((DistributedHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.herder.assignment = ExtendedAssignment.empty();
        this.herder.doRestartConnectorAndTasks(restartRequest);
        Mockito.verifyNoMoreInteractions(new Object[]{restartPlan, this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testDoRestartConnectorAndTasksOnlyConnector() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN1, 0);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(true);
        Mockito.when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(connectorTaskId));
        ((DistributedHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.herder.assignment = (ExtendedAssignment) Mockito.mock(ExtendedAssignment.class);
        Mockito.when(this.herder.assignment.connectors()).thenReturn(Collections.singletonList(CONN1));
        Mockito.when(this.herder.assignment.tasks()).thenReturn(Collections.emptyList());
        this.herder.configState = SNAPSHOT;
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).put((ConnectorStatus) ArgumentMatchers.eq(new ConnectorStatus(CONN1, AbstractStatus.State.RESTARTING, WORKER_ID, 0)));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.any(), (Callback) forClass.capture());
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).wakeup();
        this.herder.doRestartConnectorAndTasks(restartRequest);
        Mockito.verifyNoMoreInteractions(new Object[]{restartPlan, this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testDoRestartConnectorAndTasksOnlyTasks() {
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(true);
        Mockito.when(restartPlan.taskIdsToRestart()).thenReturn(Arrays.asList(TASK0, TASK1, TASK2));
        Mockito.when(Integer.valueOf(restartPlan.totalTaskCount())).thenReturn(3);
        ((DistributedHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.herder.assignment = (ExtendedAssignment) Mockito.mock(ExtendedAssignment.class);
        Mockito.when(this.herder.assignment.connectors()).thenReturn(Collections.emptyList());
        Mockito.when(this.herder.assignment.tasks()).thenReturn(Collections.singletonList(TASK0));
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        this.herder.configState = SNAPSHOT;
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks(Collections.singletonList(TASK0));
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).put((TaskStatus) ArgumentMatchers.eq(new TaskStatus(TASK0, AbstractStatus.State.RESTARTING, WORKER_ID, 0)));
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.any()))).thenReturn(true);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        Mockito.verifyNoMoreInteractions(new Object[]{restartPlan, this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testDoRestartConnectorAndTasksBoth() {
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN1, 0);
        RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
        RestartPlan restartPlan = (RestartPlan) Mockito.mock(RestartPlan.class);
        Mockito.when(Boolean.valueOf(restartPlan.shouldRestartConnector())).thenReturn(true);
        Mockito.when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(connectorTaskId));
        Mockito.when(Integer.valueOf(restartPlan.totalTaskCount())).thenReturn(1);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        ((DistributedHerder) Mockito.doReturn(Optional.of(restartPlan)).when(this.herder)).buildRestartPlan(restartRequest);
        this.herder.assignment = (ExtendedAssignment) Mockito.mock(ExtendedAssignment.class);
        Mockito.when(this.herder.assignment.connectors()).thenReturn(Collections.singletonList(CONN1));
        Mockito.when(this.herder.assignment.tasks()).thenReturn(Collections.singletonList(connectorTaskId));
        this.herder.configState = SNAPSHOT;
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).put((ConnectorStatus) ArgumentMatchers.eq(new ConnectorStatus(CONN1, AbstractStatus.State.RESTARTING, WORKER_ID, 0)));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.any(), (Callback) forClass.capture());
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).wakeup();
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks(Collections.singletonList(connectorTaskId));
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).put((TaskStatus) ArgumentMatchers.eq(new TaskStatus(TASK0, AbstractStatus.State.RESTARTING, WORKER_ID, 0)));
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.any()))).thenReturn(true);
        this.herder.doRestartConnectorAndTasks(restartRequest);
        Mockito.verifyNoMoreInteractions(new Object[]{restartPlan, this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartTask() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.any()))).thenReturn(true);
        this.herder.tick();
        expectMemberEnsureActive();
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTask(TASK0);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker)).stopAndAwaitTask(TASK0);
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartUnknownTask() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(new ConnectorTaskId("blah", 0), futureCallback);
        this.herder.tick();
        Assertions.assertEquals(NotFoundException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause().getClass());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartTaskRedirectToLeader() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(NotLeaderException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause().getClass());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRestartTaskRedirectToOwner() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        Mockito.when(this.member.ownerUrl(TASK0)).thenReturn("ownerUrl");
        expectMemberEnsureActive();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(TASK0, futureCallback);
        this.herder.tick();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertEquals(NotAssignedException.class, executionException.getCause().getClass());
        Assertions.assertEquals("ownerUrl", executionException.getCause().forwardUrl());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testRequestProcessingOrder() {
        Callable callable = (Callable) Mockito.mock(Callable.class);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        DistributedHerder.DistributedHerderRequest addRequest = this.herder.addRequest(100L, callable, callback);
        DistributedHerder.DistributedHerderRequest addRequest2 = this.herder.addRequest(10L, callable, callback);
        DistributedHerder.DistributedHerderRequest addRequest3 = this.herder.addRequest(200L, callable, callback);
        DistributedHerder.DistributedHerderRequest addRequest4 = this.herder.addRequest(200L, callable, callback);
        Assertions.assertEquals(addRequest2, this.herder.requests.pollFirst());
        Assertions.assertEquals(addRequest, this.herder.requests.pollFirst());
        Assertions.assertEquals(addRequest3, this.herder.requests.pollFirst());
        Assertions.assertEquals(addRequest4, this.herder.requests.pollFirst());
    }

    @Test
    public void testConnectorConfigAdded() {
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        expectMemberPoll();
        this.herder.tick();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        expectRebalance((Collection<String>) Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @ValueSource(shorts = {0, 1, 2})
    @ParameterizedTest
    public void testConnectorConfigDetectedAfterLeaderAlreadyAssigned(short s) {
        this.connectProtocolVersion = s;
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn(Short.valueOf(s));
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        expectMemberPoll();
        this.herder.tick();
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        expectRebalance((Collection<String>) Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker, Mockito.times(1))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testConnectorConfigUpdate() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testConnectorConfigUpdateFailedTransformation() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        WorkerConfigTransformer workerConfigTransformer = (WorkerConfigTransformer) Mockito.mock(WorkerConfigTransformer.class);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet(), workerConfigTransformer));
        Mockito.when(workerConfigTransformer.transform((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any())).thenThrow(new Throwable[]{new ConfigException("Simulated exception thrown during config transformation")});
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(ConnectorStatus.class);
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).putSafe((ConnectorStatus) forClass2.capture());
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        this.herder.tick();
        Assertions.assertEquals(CONN1, ((ConnectorStatus) forClass2.getValue()).id());
        Assertions.assertEquals(AbstractStatus.State.FAILED, ((ConnectorStatus) forClass2.getValue()).state());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore, workerConfigTransformer});
    }

    @Test
    public void testConnectorPaused() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT_PAUSED_CONN1);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock3 -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return null;
        }).when(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED), (Callback) forClass2.capture());
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED), (Callback) ArgumentMatchers.any(Callback.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testConnectorResumed() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED), (Callback) forClass.capture());
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return null;
        }).when(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass2.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock3 -> {
            return TASK_CONFIGS;
        });
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any(Callback.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testConnectorStopped() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT_STOPPED_CONN1);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock3 -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, TargetState.STOPPED);
            return null;
        }).when(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED), (Callback) forClass2.capture());
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.STOPPED), (Callback) ArgumentMatchers.any(Callback.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testUnknownConnectorPaused() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        this.configUpdateListener.onConnectorTargetStateChange("unknown-connector");
        this.herder.tick();
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testStopConnector() throws Exception {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.tick();
        expectMemberEnsureActive();
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putTaskConfigs(CONN1, Collections.emptyList());
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putTargetState(CONN1, TargetState.STOPPED);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.stopConnector(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone(), "Callback should already have been invoked by herder");
        futureCallback.get(0L, TimeUnit.MILLISECONDS);
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testStopConnectorNotLeader() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.tick();
        expectMemberEnsureActive();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.stopConnector(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone(), "Callback should already have been invoked by herder");
        Assertions.assertInstanceOf(NotLeaderException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(0L, TimeUnit.SECONDS);
        }, "Should not be able to handle request to stop connector when not leader")).getCause());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testStopConnectorFailToWriteTaskConfigs() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.tick();
        Throwable connectException = new ConnectException("Could not write task configs to config topic");
        expectMemberEnsureActive();
        ((ConfigBackingStore) Mockito.doThrow(new Throwable[]{connectException}).when(this.configBackingStore)).putTaskConfigs(CONN1, Collections.emptyList());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.stopConnector(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone(), "Callback should already have been invoked by herder");
        Assertions.assertEquals(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(0L, TimeUnit.SECONDS);
        }, "Should not be able to handle request to stop connector when not leader")).getCause(), connectException);
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testConnectorPausedRunningTaskOnly() {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT_PAUSED_CONN1);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return null;
        }).when(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED), (Callback) forClass.capture());
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED), (Callback) ArgumentMatchers.any(Callback.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testConnectorResumedRunningTaskOnly() {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.singletonList(TASK0));
        expectConfigRefreshAndSnapshot(SNAPSHOT_PAUSED_CONN1);
        expectMemberPoll();
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.PAUSED)))).thenReturn(true);
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.PAUSED);
            return null;
        }).when(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        this.configUpdateListener.onConnectorTargetStateChange(CONN1);
        this.herder.tick();
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker)).setTargetState((String) ArgumentMatchers.eq(CONN1), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any(Callback.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testTaskConfigAdded() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        expectMemberPoll();
        this.herder.tick();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK0, TASK1, TASK2));
        this.herder.tick();
        expectRebalance((Collection<String>) Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.emptyList(), Collections.singletonList(TASK0));
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK0), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        this.herder.tick();
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testJoinLeaderCatchUpFails() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        ((ConfigBackingStore) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(this.configBackingStore)).refresh(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).maybeLeaveGroup((String) ArgumentMatchers.eq("taking too long to read the log"));
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        long milliseconds = this.time.milliseconds();
        this.herder.tick();
        Assertions.assertEquals(milliseconds + 100 + 300000, this.time.milliseconds());
        this.time.sleep(1000L);
        assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
        long milliseconds2 = this.time.milliseconds();
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        expectMemberPoll();
        this.herder.tick();
        Assertions.assertEquals(milliseconds2 + 100, this.time.milliseconds());
        this.time.sleep(2000L);
        assertStatistics("leaderUrl", false, 3, 1, 100.0d, 2000.0d);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        this.herder.tick();
        ((ConfigBackingStore) Mockito.verify(this.configBackingStore, Mockito.times(2))).refresh(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Exception {
        this.connectProtocolVersion = (short) 1;
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 1);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Collections.singletonList(CONN1), Collections.singletonList(TASK1), 0, true);
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        ((ConfigBackingStore) Mockito.doThrow(TimeoutException.class).when(this.configBackingStore)).refresh(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).maybeLeaveGroup((String) ArgumentMatchers.eq("taking too long to read the log"));
        int i = 100;
        for (int i2 = 5; i2 >= 5 - 3; i2--) {
            long milliseconds = this.time.milliseconds();
            int i3 = (DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10) / i2;
            this.herder.tick();
            Assertions.assertEquals(milliseconds + i + i3, this.time.milliseconds());
            i = 0;
        }
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Collections.singletonList(CONN1), Collections.singletonList(TASK1), 0, true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        long milliseconds2 = this.time.milliseconds();
        this.herder.tick();
        Assertions.assertEquals(milliseconds2 + 100, this.time.milliseconds());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Exception {
        this.connectProtocolVersion = (short) 1;
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 1);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.singletonList(TASK1), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        Mockito.when(Boolean.valueOf(this.worker.startSourceTask((ConnectorTaskId) ArgumentMatchers.eq(TASK1), (ClusterConfigState) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (TaskStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED)))).thenReturn(true);
        assertStatistics(0, 0, 0.0d, Double.POSITIVE_INFINITY);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Collections.singletonList(CONN1), Collections.singletonList(TASK1), 0, true);
        this.time.sleep(2000L);
        assertStatistics(3, 1, 100.0d, 2000.0d);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, "leader", "leaderUrl", Collections.emptyList(), Collections.emptyList(), 0, true);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).requestRejoin();
        ((ConfigBackingStore) Mockito.doThrow(TimeoutException.class).when(this.configBackingStore)).refresh(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).maybeLeaveGroup((String) ArgumentMatchers.eq("taking too long to read the log"));
        int i = 100;
        for (int i2 = 5; i2 > 0; i2--) {
            long milliseconds = this.time.milliseconds();
            int i3 = (DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10) / i2;
            this.herder.tick();
            Assertions.assertEquals(milliseconds + i + i3, this.time.milliseconds());
            i = 0;
        }
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(ExtendedAssignment.class);
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).revokeAssignment((ExtendedAssignment) forClass2.capture());
        long milliseconds2 = this.time.milliseconds();
        this.herder.tick();
        Assertions.assertEquals(milliseconds2, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(CONN1), ((ExtendedAssignment) forClass2.getValue()).connectors());
        Assertions.assertEquals(Collections.singleton(TASK1), ((ExtendedAssignment) forClass2.getValue()).tasks());
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, "leader", "leaderUrl", Collections.singletonList(CONN1), Collections.singletonList(TASK1), 0, true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.herder.tick();
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testAccessors() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectMemberPoll();
        WorkerConfigTransformer workerConfigTransformer = (WorkerConfigTransformer) Mockito.mock(WorkerConfigTransformer.class);
        expectConfigRefreshAndSnapshot(new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet(), workerConfigTransformer));
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectors(futureCallback);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.connectorInfo(CONN1, futureCallback2);
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        FutureCallback futureCallback4 = new FutureCallback();
        this.herder.taskConfigs(CONN1, futureCallback4);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone());
        Assertions.assertEquals(Collections.singleton(CONN1), futureCallback.get());
        Assertions.assertTrue(futureCallback2.isDone());
        Assertions.assertEquals(new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2), ConnectorType.SOURCE), futureCallback2.get());
        Assertions.assertTrue(futureCallback3.isDone());
        Assertions.assertEquals(CONN1_CONFIG, futureCallback3.get());
        Assertions.assertTrue(futureCallback4.isDone());
        Assertions.assertEquals(Arrays.asList(new TaskInfo(TASK0, TASK_CONFIG), new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), futureCallback4.get());
        ((WorkerConfigTransformer) Mockito.verify(workerConfigTransformer, Mockito.never())).transform((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), true);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        expectMemberPoll();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone());
        Assertions.assertEquals(CONN1_CONFIG, futureCallback.get());
        expectMemberEnsureActive();
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock3 -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, CONN1_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(CONN1_CONFIG_UPDATED), (Callback) forClass2.capture());
        ((ConfigBackingStore) Mockito.doAnswer(invocationOnMock4 -> {
            this.configUpdateListener.onConnectorConfigUpdate(CONN1);
            return null;
        }).when(this.configBackingStore)).putConnectorConfig((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG_UPDATED), (TargetState) ArgumentMatchers.isNull());
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock5 -> {
            ((Callback) forClass3.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG_UPDATED), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass3.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfigUpdated, invocationOnMock6 -> {
            return TASK_CONFIGS;
        });
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, futureCallback2);
        this.herder.tick();
        Assertions.assertTrue(futureCallback2.isDone());
        Assertions.assertEquals(new Herder.Created(false, new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2), ConnectorType.SOURCE)), futureCallback2.get());
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        this.herder.tick();
        Assertions.assertTrue(futureCallback3.isDone());
        Assertions.assertEquals(CONN1_CONFIG_UPDATED, futureCallback3.get());
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testPatchConnectorConfigNotFound() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        expectRebalance(0L, Collections.emptyList(), Collections.emptyList(), true);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectConfigRefreshAndSnapshot(new ClusterConfigState(0L, (SessionKey) null, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()));
        HashMap hashMap = new HashMap();
        hashMap.put("foo1", "baz1");
        FutureCallback futureCallback = new FutureCallback();
        this.herder.patchConnectorConfig(CONN2, hashMap, futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone());
        Objects.requireNonNull(futureCallback);
        Assertions.assertInstanceOf(NotFoundException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, futureCallback::get)).getCause());
    }

    @Test
    public void testPatchConnectorConfigNotALeader() {
        Mockito.when(this.member.memberId()).thenReturn("not-leader");
        expectConfigRefreshAndSnapshot(new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 0), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()));
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), false);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.patchConnectorConfig(CONN1, new HashMap(), futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone());
        Objects.requireNonNull(futureCallback);
        Assertions.assertInstanceOf(ConnectException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, futureCallback::get)).getCause());
    }

    @Test
    public void testPatchConnectorConfig() throws Exception {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        HashMap hashMap = new HashMap(CONN1_CONFIG);
        hashMap.put("foo0", "unaffected");
        hashMap.put("foo1", "will-be-changed");
        hashMap.put("foo2", "will-be-removed");
        expectConfigRefreshAndSnapshot(new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 0), Collections.singletonMap(CONN1, hashMap), Collections.singletonMap(CONN1, TargetState.STARTED), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()));
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectMemberPoll();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("foo1", "changed");
        hashMap2.put("foo2", null);
        hashMap2.put("foo3", "added");
        HashMap hashMap3 = new HashMap(hashMap);
        hashMap3.put("foo0", "unaffected");
        hashMap3.put("foo1", "changed");
        hashMap3.remove("foo2");
        hashMap3.put("foo3", "added");
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((DistributedHerder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, CONN1_CONFIG_INFOS);
            return null;
        }).when(this.herder)).validateConnectorConfig((Map) ArgumentMatchers.eq(hashMap3), (Callback) forClass.capture());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.patchConnectorConfig(CONN1, hashMap2, futureCallback);
        this.herder.tick();
        Assertions.assertTrue(futureCallback.isDone());
        Assertions.assertEquals(hashMap3, ((ConnectorInfo) ((Herder.Created) futureCallback.get()).result()).config());
        ((ConfigBackingStore) Mockito.verify(this.configBackingStore)).putConnectorConfig((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(hashMap3), (TargetState) ArgumentMatchers.isNull());
        Mockito.verifyNoMoreInteractions(new Object[]{this.configBackingStore});
    }

    @Test
    public void testKeyRotationWhenWorkerBecomesLeader() {
        long j = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectMemberPoll();
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member)).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
        expectRebalance(2L, Collections.emptyList(), Collections.emptyList());
        SessionKey sessionKey = new SessionKey((SecretKey) Mockito.mock(SecretKey.class), 0L);
        expectConfigRefreshAndSnapshot(new ClusterConfigState(2L, sessionKey, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()));
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(2))).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
        expectRebalance(2L, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL, true);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SessionKey.class);
        ((ConfigBackingStore) Mockito.doAnswer(invocationOnMock -> {
            this.configUpdateListener.onSessionKeyUpdate((SessionKey) forClass.getValue());
            return null;
        }).when(this.configBackingStore)).putSessionKey((SessionKey) forClass.capture());
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member)).poll(ArgumentMatchers.eq(j), (Supplier) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testKeyRotationDisabledWhenWorkerBecomesFollower() {
        long j = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL, true);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        SecretKey secretKey = (SecretKey) Mockito.mock(SecretKey.class);
        Mockito.when(secretKey.getAlgorithm()).thenReturn("HmacSHA256");
        Mockito.when(secretKey.getEncoded()).thenReturn(new byte[32]);
        SessionKey sessionKey = new SessionKey(secretKey, this.time.milliseconds());
        expectConfigRefreshAndSnapshot(new ClusterConfigState(1L, sessionKey, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()));
        expectMemberPoll();
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member)).poll(AdditionalMatchers.leq(j), (Supplier) ArgumentMatchers.any());
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member)).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testPutTaskConfigsSignatureNotRequiredV0() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        List<String> expectRecordStages = expectRecordStages(callback);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, (InternalRequestSignature) null);
        ((WorkerGroupMember) Mockito.verify(this.member)).wakeup();
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
        Assertions.assertEquals(Collections.singletonList("awaiting startup"), expectRecordStages);
    }

    @Test
    public void testPutTaskConfigsSignatureNotRequiredV1() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 1);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        List<String> expectRecordStages = expectRecordStages(callback);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, (InternalRequestSignature) null);
        ((WorkerGroupMember) Mockito.verify(this.member)).wakeup();
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
        Assertions.assertEquals(Collections.singletonList("awaiting startup"), expectRecordStages);
    }

    @Test
    public void testPutTaskConfigsMissingRequiredSignature() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, (InternalRequestSignature) null);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) forClass.capture(), (Void) ArgumentMatchers.isNull());
        Assertions.assertInstanceOf(BadRequestException.class, forClass.getValue());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
    }

    @Test
    public void testPutTaskConfigsDisallowedSignatureAlgorithm() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) Mockito.mock(InternalRequestSignature.class);
        Mockito.when(internalRequestSignature.keyAlgorithm()).thenReturn("HmacSHA489");
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) forClass.capture(), (Void) ArgumentMatchers.isNull());
        Assertions.assertInstanceOf(BadRequestException.class, forClass.getValue());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
    }

    @Test
    public void testPutTaskConfigsInvalidSignature() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) Mockito.mock(InternalRequestSignature.class);
        Mockito.when(internalRequestSignature.keyAlgorithm()).thenReturn("HmacSHA256");
        Mockito.when(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) ArgumentMatchers.any()))).thenReturn(false);
        SessionKey sessionKey = (SessionKey) Mockito.mock(SessionKey.class);
        Mockito.when(sessionKey.key()).thenReturn((SecretKey) Mockito.mock(SecretKey.class));
        Mockito.when(Long.valueOf(sessionKey.creationTimestamp())).thenReturn(Long.valueOf(this.time.milliseconds()));
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) forClass.capture(), (Void) ArgumentMatchers.isNull());
        Assertions.assertInstanceOf(ConnectRestException.class, forClass.getValue());
        Assertions.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), ((ConnectRestException) forClass.getValue()).statusCode());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
    }

    @Test
    public void putTaskConfigsWorkerStillStarting() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) Mockito.mock(InternalRequestSignature.class);
        Mockito.when(internalRequestSignature.keyAlgorithm()).thenReturn("HmacSHA256");
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) forClass.capture(), (Void) ArgumentMatchers.isNull());
        Assertions.assertInstanceOf(ConnectRestException.class, forClass.getValue());
        Assertions.assertEquals(Response.Status.SERVICE_UNAVAILABLE.getStatusCode(), ((ConnectRestException) forClass.getValue()).statusCode());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
    }

    @Test
    public void testPutTaskConfigsValidRequiredSignature() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) Mockito.mock(InternalRequestSignature.class);
        Mockito.when(internalRequestSignature.keyAlgorithm()).thenReturn("HmacSHA256");
        Mockito.when(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) ArgumentMatchers.any()))).thenReturn(true);
        SessionKey sessionKey = (SessionKey) Mockito.mock(SessionKey.class);
        Mockito.when(sessionKey.key()).thenReturn((SecretKey) Mockito.mock(SecretKey.class));
        Mockito.when(Long.valueOf(sessionKey.creationTimestamp())).thenReturn(Long.valueOf(this.time.milliseconds()));
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        List<String> expectRecordStages = expectRecordStages(callback);
        this.herder.putTaskConfigs(CONN1, TASK_CONFIGS, callback, internalRequestSignature);
        ((WorkerGroupMember) Mockito.verify(this.member)).wakeup();
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, callback});
        Assertions.assertEquals(Collections.singletonList("awaiting startup"), expectRecordStages);
    }

    @Test
    public void testFailedToWriteSessionKey() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ((ConfigBackingStore) Mockito.doThrow(new Throwable[]{new ConnectException("Oh no!")}).when(this.configBackingStore)).putSessionKey((SessionKey) ArgumentMatchers.any(SessionKey.class));
        this.herder.tick();
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putSessionKey((SessionKey) ArgumentMatchers.any(SessionKey.class));
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(2))).ensureActive((Supplier) ArgumentMatchers.any());
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(1))).poll(ArgumentMatchers.anyLong(), (Supplier) ArgumentMatchers.any());
        ((ConfigBackingStore) Mockito.verify(this.configBackingStore, Mockito.times(2))).putSessionKey((SessionKey) ArgumentMatchers.any(SessionKey.class));
    }

    @Test
    public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
        SecretKey secretKey = (SecretKey) Mockito.mock(SecretKey.class);
        Mockito.when(secretKey.getAlgorithm()).thenReturn("HmacSHA256");
        Mockito.when(secretKey.getEncoded()).thenReturn(new byte[32]);
        SessionKey sessionKey = new SessionKey(secretKey, this.time.milliseconds());
        ClusterConfigState clusterConfigState = new ClusterConfigState(1L, sessionKey, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet());
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ((ConfigBackingStore) Mockito.doThrow(new Throwable[]{new ConnectException("Oh no!")}).when(this.configBackingStore)).putSessionKey((SessionKey) ArgumentMatchers.any(SessionKey.class));
        this.herder.tick();
        ((ConfigBackingStore) Mockito.doAnswer(invocationOnMock -> {
            this.configUpdateListener.onSessionKeyUpdate(sessionKey);
            return null;
        }).when(this.configBackingStore)).refresh(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(clusterConfigState);
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(2))).ensureActive((Supplier) ArgumentMatchers.any());
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(1))).poll(ArgumentMatchers.anyLong(), (Supplier) ArgumentMatchers.any());
        ((ConfigBackingStore) Mockito.verify(this.configBackingStore, Mockito.times(1))).putSessionKey((SessionKey) ArgumentMatchers.any(SessionKey.class));
    }

    @Test
    public void testFenceZombiesInvalidSignature() {
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        InternalRequestSignature internalRequestSignature = (InternalRequestSignature) Mockito.mock(InternalRequestSignature.class);
        Mockito.when(internalRequestSignature.keyAlgorithm()).thenReturn("HmacSHA256");
        Mockito.when(Boolean.valueOf(internalRequestSignature.isValid((SecretKey) ArgumentMatchers.any()))).thenReturn(false);
        SessionKey sessionKey = (SessionKey) Mockito.mock(SessionKey.class);
        Mockito.when(sessionKey.key()).thenReturn((SecretKey) Mockito.mock(SecretKey.class));
        Mockito.when(Long.valueOf(sessionKey.creationTimestamp())).thenReturn(Long.valueOf(this.time.milliseconds()));
        this.configUpdateListener.onSessionKeyUpdate(sessionKey);
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.herder.fenceZombieSourceTasks(CONN1, callback, internalRequestSignature);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((Callback) Mockito.verify(callback)).onCompletion((Throwable) forClass.capture(), (Void) ArgumentMatchers.isNull());
        Assertions.assertInstanceOf(ConnectRestException.class, forClass.getValue());
        Assertions.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), ((ConnectRestException) forClass.getValue()).statusCode());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member});
    }

    @Test
    public void testTaskRequestedZombieFencingForwardedToLeader() throws Exception {
        testTaskRequestedZombieFencingForwardingToLeader(true);
    }

    @Test
    public void testTaskRequestedZombieFencingFailedForwardToLeader() throws Exception {
        testTaskRequestedZombieFencingForwardingToLeader(false);
    }

    private void testTaskRequestedZombieFencingForwardingToLeader(boolean z) throws Exception {
        expectHerderStartup();
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        this.herder.forwardRequestExecutor = executorService;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectMemberPoll();
        ((RestClient) Mockito.doAnswer(invocationOnMock -> {
            if (z) {
                return null;
            }
            throw new ConnectRestException(409, "Rebalance :(");
        }).when(this.restClient)).httpRequest((String) ArgumentMatchers.any(), (String) ArgumentMatchers.eq("PUT"), (HttpHeaders) ArgumentMatchers.isNull(), ArgumentMatchers.isNull(), (SecretKey) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
        ((ExecutorService) Mockito.doAnswer(invocationOnMock2 -> {
            ((Runnable) forClass.getValue()).run();
            return null;
        }).when(executorService)).execute((Runnable) forClass.capture());
        expectHerderShutdown();
        ((ExecutorService) Mockito.doNothing().when(executorService)).shutdown();
        Mockito.when(Boolean.valueOf(executorService.awaitTermination(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any()))).thenReturn(true);
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(TASK1, futureCallback);
        if (z) {
            futureCallback.get(10L, TimeUnit.SECONDS);
        } else {
            Assertions.assertInstanceOf(ConnectException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                futureCallback.get(10L, TimeUnit.SECONDS);
            })).getCause());
        }
        stopBackgroundHerder();
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.worker, executorService});
    }

    @Test
    public void testExternalZombieFencingRequestForAlreadyFencedConnector() throws Exception {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 12), Collections.singletonMap(CONN1, 5), Collections.emptySet()), false);
    }

    @Test
    public void testExternalZombieFencingRequestForSingleTaskConnector() throws Exception {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(exactlyOnceSnapshot(expectNewSessionKey(), Collections.singletonMap(TASK1, TASK_CONFIG), Collections.singletonMap(CONN1, 1), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)), true);
    }

    @Test
    public void testExternalZombieFencingRequestForFreshConnector() throws Exception {
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)), true);
    }

    private void testExternalZombieFencingRequestThatRequiresNoPhysicalFencing(ClusterConfigState clusterConfigState, boolean z) throws Exception {
        expectHerderStartup();
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        expectConfigRefreshAndSnapshot(clusterConfigState);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        if (z) {
            ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putTaskCountRecord(CONN1, 1);
        }
        expectHerderShutdown();
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        futureCallback.get(10L, TimeUnit.SECONDS);
        stopBackgroundHerder();
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.worker, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testExternalZombieFencingRequestImmediateCompletion() throws Exception {
        expectHerderStartup();
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 2), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)));
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
        Mockito.when(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) forClass.capture())).thenAnswer(invocationOnMock -> {
            ((KafkaFuture.BiConsumer) forClass.getValue()).accept((Object) null, (Object) null);
            return null;
        });
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaFuture.BaseFunction.class);
        Mockito.when(kafkaFuture.thenApply((KafkaFuture.BaseFunction) forClass2.capture())).thenAnswer(invocationOnMock2 -> {
            ((KafkaFuture.BaseFunction) forClass2.getValue()).apply((Object) null);
            return kafkaFuture2;
        });
        Mockito.when(this.worker.fenceZombies((String) ArgumentMatchers.eq(CONN1), ArgumentMatchers.eq(2), (Map) ArgumentMatchers.eq(CONN1_CONFIG))).thenReturn(kafkaFuture);
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putTaskCountRecord(CONN1, 1);
        expectHerderShutdown();
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        futureCallback.get(10L, TimeUnit.SECONDS);
        stopBackgroundHerder();
        Mockito.verifyNoMoreInteractions(new Object[]{kafkaFuture2, kafkaFuture, this.member, this.worker, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testExternalZombieFencingRequestSynchronousFailure() throws Exception {
        expectHerderStartup();
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 2), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)));
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        Throwable kafkaException = new KafkaException("whoops!");
        Mockito.when(this.worker.fenceZombies((String) ArgumentMatchers.eq(CONN1), ArgumentMatchers.eq(2), (Map) ArgumentMatchers.eq(CONN1_CONFIG))).thenThrow(new Throwable[]{kafkaException});
        expectHerderShutdown();
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        Assertions.assertEquals(kafkaException, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(10L, TimeUnit.SECONDS);
        })).getCause());
        stopBackgroundHerder();
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.worker, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testExternalZombieFencingRequestAsynchronousFailure() throws Exception {
        expectHerderStartup();
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot(expectNewSessionKey(), TASK_CONFIGS_MAP, Collections.singletonMap(CONN1, 2), Collections.singletonMap(CONN1, 5), Collections.singleton(CONN1)));
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
        Mockito.when(this.worker.fenceZombies((String) ArgumentMatchers.eq(CONN1), ArgumentMatchers.eq(2), (Map) ArgumentMatchers.eq(CONN1_CONFIG))).thenReturn(kafkaFuture);
        Mockito.when(kafkaFuture.thenApply((KafkaFuture.BaseFunction) ArgumentMatchers.any(KafkaFuture.BaseFunction.class))).thenReturn(kafkaFuture2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Mockito.when(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) forClass.capture())).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        });
        expectHerderShutdown();
        startBackgroundHerder();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.fenceZombieSourceTasks(CONN1, futureCallback);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        AuthorizationException authorizationException = new AuthorizationException("you didn't say the magic word");
        forClass.getAllValues().forEach(biConsumer -> {
            biConsumer.accept((Object) null, authorizationException);
        });
        Assertions.assertInstanceOf(ConnectException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(10L, TimeUnit.SECONDS);
        })).getCause());
        stopBackgroundHerder();
        Mockito.verifyNoMoreInteractions(new Object[]{kafkaFuture2, kafkaFuture, this.member, this.worker, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testExternalZombieFencingRequestDelayedCompletion() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CONN1, 5);
        hashMap.put(CONN2, 3);
        hashMap.put("sourceC", 12);
        expectHerderStartup();
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 2);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        SessionKey expectNewSessionKey = expectNewSessionKey();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(CONN1, 2);
        hashMap2.put(CONN2, 3);
        hashMap2.put("sourceC", 5);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(CONN1, 3);
        hashMap3.put(CONN2, 4);
        hashMap3.put("sourceC", 2);
        expectConfigRefreshAndSnapshot(exactlyOnceSnapshot(expectNewSessionKey, TASK_CONFIGS_MAP, hashMap2, hashMap3, new HashSet(Arrays.asList(CONN1, CONN2, "sourceC")), hashMap));
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        HashMap hashMap6 = new HashMap();
        hashMap.keySet().forEach(str -> {
            KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
            KafkaFuture kafkaFuture2 = (KafkaFuture) Mockito.mock(KafkaFuture.class);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
            hashMap4.put(str, forClass);
            Mockito.when(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) forClass.capture())).thenReturn((Object) null);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaFuture.BaseFunction.class);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            hashMap5.put(str, forClass2);
            hashMap6.put(str, countDownLatch);
            Mockito.when(kafkaFuture.thenApply((KafkaFuture.BaseFunction) forClass2.capture())).thenAnswer(invocationOnMock -> {
                countDownLatch.countDown();
                return kafkaFuture2;
            });
            Mockito.when(this.worker.fenceZombies((String) ArgumentMatchers.eq(str), ((Integer) ArgumentMatchers.eq((Integer) hashMap2.get(str))).intValue(), (Map) ArgumentMatchers.any())).thenReturn(kafkaFuture);
        });
        hashMap.forEach((str2, num) -> {
            ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).putTaskCountRecord((String) ArgumentMatchers.eq(str2), ((Integer) ArgumentMatchers.eq(num)).intValue());
        });
        expectHerderShutdown();
        startBackgroundHerder();
        ArrayList arrayList = new ArrayList();
        hashMap.forEach((str3, num2) -> {
            List list = (List) IntStream.range(0, num2.intValue()).mapToObj(i -> {
                return new FutureCallback();
            }).collect(Collectors.toList());
            list.forEach(futureCallback -> {
                this.herder.fenceZombieSourceTasks(str3, futureCallback);
            });
            arrayList.addAll(list);
        });
        hashMap6.forEach((str4, countDownLatch) -> {
            try {
                Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                ((KafkaFuture.BaseFunction) ((ArgumentCaptor) hashMap5.get(str4)).getValue()).apply((Object) null);
                ((ArgumentCaptor) hashMap4.get(str4)).getAllValues().forEach(biConsumer -> {
                    biConsumer.accept((Object) null, (Object) null);
                });
            } catch (InterruptedException e) {
                Assertions.fail("Unexpectedly interrupted");
            }
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((FutureCallback) it.next()).get(10L, TimeUnit.SECONDS);
        }
        stopBackgroundHerder();
        hashMap.keySet().forEach(str5 -> {
            ((Worker) Mockito.verify(this.worker)).fenceZombies((String) ArgumentMatchers.eq(str5), ((Integer) ArgumentMatchers.eq((Integer) hashMap2.get(str5))).intValue(), (Map) ArgumentMatchers.any());
        });
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.worker, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testVerifyTaskGeneration() {
        HashMap hashMap = new HashMap();
        this.herder.configState = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), hashMap, Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet());
        Callback callback = (Callback) Mockito.mock(Callback.class);
        this.herder.assignment = new ExtendedAssignment((short) 2, (short) 0, "leader", "leaderUrl", 0L, Collections.emptySet(), Collections.singleton(TASK1), Collections.emptySet(), Collections.emptySet(), 0);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        hashMap.put(CONN1, 0);
        this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        hashMap.put(CONN1, 1);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        hashMap.put(CONN1, 2);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        hashMap.put(CONN1, 3);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 0, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 1, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(TASK1, 2, callback);
        });
        ConnectorTaskId connectorTaskId = new ConnectorTaskId(CONN2, 0);
        hashMap.put(connectorTaskId.connector(), 1);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(connectorTaskId, 0, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(connectorTaskId, 1, callback);
        });
        Assertions.assertThrows(ConnectException.class, () -> {
            this.herder.verifyTaskGenerationAndOwnership(connectorTaskId, 2, callback);
        });
        ((Callback) Mockito.verify(callback, Mockito.times(3))).onCompletion((Throwable) ArgumentMatchers.isNull(), (Void) ArgumentMatchers.isNull());
    }

    @Test
    public void testKeyExceptionDetection() {
        Assertions.assertFalse(this.herder.isPossibleExpiredKeyException(this.time.milliseconds(), new RuntimeException()));
        Assertions.assertFalse(this.herder.isPossibleExpiredKeyException(this.time.milliseconds(), new BadRequestException("")));
        Assertions.assertFalse(this.herder.isPossibleExpiredKeyException(this.time.milliseconds() - TimeUnit.MINUTES.toMillis(2L), new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "")));
        Assertions.assertTrue(this.herder.isPossibleExpiredKeyException(this.time.milliseconds(), new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "")));
    }

    @Test
    public void testInconsistentConfigs() {
    }

    @Test
    public void testThreadNames() {
        Assertions.assertTrue(((ThreadPoolExecutor) this.herder.herderExecutor).getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith(DistributedHerder.class.getSimpleName()));
        Assertions.assertTrue(((ThreadPoolExecutor) this.herder.forwardRequestExecutor).getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("ForwardRequestExecutor"));
        Assertions.assertTrue(((ThreadPoolExecutor) this.herder.startAndStopExecutor).getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor"));
    }

    @Test
    public void testHerderStopServicesClosesUponShutdown() {
        Assertions.assertEquals(1L, this.shutdownCalled.getCount());
        this.herder.stopServices();
        Assertions.assertTrue(this.noneConnectorClientConfigOverridePolicy.isClosed());
        Assertions.assertEquals(0L, this.shutdownCalled.getCount());
    }

    @Test
    public void testPollDurationOnSlowConnectorOperations() {
        this.connectProtocolVersion = (short) 1;
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn(Short.valueOf(this.connectProtocolVersion));
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList(), 20000);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            this.time.sleep(10000L);
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfig, invocationOnMock2 -> {
            return TASK_CONFIGS;
        });
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList(), 20000);
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(CONN1);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock3 -> {
            this.time.sleep(10000L);
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, TargetState.STARTED);
            return true;
        }).when(this.worker)).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG_UPDATED), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) forClass2.capture());
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfigUpdated, invocationOnMock4 -> {
            return TASK_CONFIGS;
        });
        this.configUpdateListener.onConnectorConfigUpdate(CONN1);
        this.herder.tick();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.singletonList(CONN1), Collections.emptyList(), 20000);
        expectExecuteTaskReconfiguration(true, this.conn1SinkConfigUpdated, invocationOnMock5 -> {
            this.time.sleep(10000L);
            return TASK_CONFIGS;
        });
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(3))).poll(AdditionalMatchers.leq(10000L), (Supplier) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker, Mockito.times(2))).startConnector((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.any(), (CloseableConnectorContext) ArgumentMatchers.any(), (ConnectorStatus.Listener) ArgumentMatchers.eq(this.herder), (TargetState) ArgumentMatchers.eq(TargetState.STARTED), (Callback) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.worker, this.configBackingStore});
    }

    @Test
    public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.singletonList(CONN1), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.herder.startAndStopExecutor.shutdown();
        DistributedHerder distributedHerder = this.herder;
        Objects.requireNonNull(distributedHerder);
        Assertions.assertThrows(RejectedExecutionException.class, distributedHerder::tick);
    }

    @Test
    public void testTaskReconfigurationRetriesWithConnectorTaskConfigsException() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(Boolean.valueOf(this.worker.isRunning(CONN1))).thenReturn(true);
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        Mockito.when(this.worker.connectorTaskConfigs(CONN1, new SinkConnectorConfig(this.plugins, CONN1_CONFIG))).thenThrow(new Throwable[]{new ConnectException("Failed to generate task configs")}).thenThrow(new Throwable[]{new ConnectException("Failed to generate task configs")}).thenReturn(TASK_CONFIGS);
        expectAndVerifyTaskReconfigurationRetries();
    }

    @Test
    public void testTaskReconfigurationNoRetryWithTooManyTasks() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(Boolean.valueOf(this.worker.isRunning(CONN1))).thenReturn(true);
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(1))).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
        int intValue = MAX_TASKS.intValue() + 5;
        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(this.plugins, CONN1_CONFIG);
        Mockito.when(this.worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)).thenThrow(new Throwable[]{new TooManyTasksException(CONN1, intValue, MAX_TASKS.intValue())});
        this.herder.requestTaskReconfiguration(CONN1);
        this.herder.tick();
        ((Worker) Mockito.verify(this.worker, Mockito.times(1))).connectorTaskConfigs(CONN1, sinkConnectorConfig);
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(2))).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker});
        this.time.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_TIMEOUT_MILLIS);
        this.herder.tick();
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker});
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(3))).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
    }

    @Test
    public void testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
        this.herder = (DistributedHerder) Mockito.mock(DistributedHerder.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS).useConstructor(new Object[]{new DistributedConfig(HERDER_CONFIG), this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, this.configBackingStore, this.member, MEMBER_URL, this.restClient, this.metrics, this.time, this.noneConnectorClientConfigOverridePolicy, Collections.emptyList(), new MockSynchronousExecutor(), new AutoCloseable[0]}));
        DistributedHerder distributedHerder = this.herder;
        Objects.requireNonNull(distributedHerder);
        this.rebalanceListener = new DistributedHerder.RebalanceListener(distributedHerder, this.time);
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).thenReturn(Boolean.TRUE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), false);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(Boolean.valueOf(this.worker.isRunning(CONN1))).thenReturn(true);
        Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(this.plugins, CONN1_CONFIG);
        ArrayList arrayList = new ArrayList(TASK_CONFIGS);
        arrayList.add(TASK_CONFIG);
        Mockito.when(this.worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)).thenReturn(arrayList);
        ((RestClient) Mockito.doThrow(new Throwable[]{new ConnectException("Request to leader to reconfigure connector tasks failed")}).doThrow(new Throwable[]{new ConnectException("Request to leader to reconfigure connector tasks failed")}).doNothing().when(this.restClient)).httpRequest((String) ArgumentMatchers.any(), (String) ArgumentMatchers.eq("POST"), (HttpHeaders) ArgumentMatchers.any(), ArgumentMatchers.any(), (SecretKey) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        expectAndVerifyTaskReconfigurationRetries();
    }

    private void expectAndVerifyTaskReconfigurationRetries() {
        this.herder.tick();
        this.herder.requestTaskReconfiguration(CONN1);
        this.herder.tick();
        this.time.sleep(250L);
        this.herder.tick();
        this.time.sleep(500L);
        this.herder.tick();
        ((WorkerGroupMember) Mockito.verify(this.member, Mockito.times(2))).poll(ArgumentMatchers.eq(Long.MAX_VALUE), (Supplier) ArgumentMatchers.any());
        ((WorkerGroupMember) Mockito.verify(this.member)).poll(ArgumentMatchers.eq(250L), (Supplier) ArgumentMatchers.any());
        ((WorkerGroupMember) Mockito.verify(this.member)).poll(ArgumentMatchers.eq(500L), (Supplier) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.member, this.worker, this.restClient});
    }

    @Test
    public void processRestartRequestsFailureSuppression() {
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).wakeup();
        RestartRequest restartRequest = new RestartRequest(FOO_TOPIC, false, false);
        ((DistributedHerder) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(this.herder)).buildRestartPlan(restartRequest);
        this.configUpdateListener.onRestartRequest(restartRequest);
        Assertions.assertEquals(1, this.herder.pendingRestartRequests.size());
        this.herder.processRestartRequests();
        Assertions.assertTrue(this.herder.pendingRestartRequests.isEmpty());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void processRestartRequestsDequeue() {
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).wakeup();
        ((DistributedHerder) Mockito.doReturn(Optional.empty()).when(this.herder)).buildRestartPlan((RestartRequest) ArgumentMatchers.any(RestartRequest.class));
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, false, false));
        this.configUpdateListener.onRestartRequest(new RestartRequest(BAR_TOPIC, false, false));
        Assertions.assertEquals(2, this.herder.pendingRestartRequests.size());
        this.herder.processRestartRequests();
        Assertions.assertTrue(this.herder.pendingRestartRequests.isEmpty());
    }

    @Test
    public void preserveHighestImpactRestartRequest() {
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).wakeup();
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, false, false));
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, false, true));
        Assertions.assertEquals(1, this.herder.pendingRestartRequests.size());
        Assertions.assertFalse(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).onlyFailed());
        Assertions.assertTrue(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).includeTasks());
        this.configUpdateListener.onRestartRequest(new RestartRequest(FOO_TOPIC, true, false));
        Assertions.assertEquals(1, this.herder.pendingRestartRequests.size());
        Assertions.assertFalse(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).onlyFailed());
        Assertions.assertTrue(((RestartRequest) this.herder.pendingRestartRequests.get(FOO_TOPIC)).includeTasks());
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testExactlyOnceSourceSupportValidation() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.exactlyOnceSupport((Map) ArgumentMatchers.eq(hashMap))).thenReturn(ExactlyOnceSupport.SUPPORTED);
        Assertions.assertEquals(Collections.emptyList(), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages());
    }

    @Test
    public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.exactlyOnceSupport((Map) ArgumentMatchers.eq(hashMap))).thenReturn(ExactlyOnceSupport.UNSUPPORTED);
        Assertions.assertEquals(Collections.singletonList("The connector does not support exactly-once semantics with the provided configuration."), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages());
    }

    @Test
    public void testExactlyOnceSourceSupportValidationOnUnknownConnector() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.exactlyOnceSupport((Map) ArgumentMatchers.eq(hashMap))).thenReturn((Object) null);
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages();
        Assertions.assertFalse(errorMessages.isEmpty());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("The connector does not implement the API required for preflight validation of exactly-once source support."), "Error message did not contain expected text: " + ((String) errorMessages.get(0)));
        Assertions.assertEquals(1, errorMessages.size());
    }

    @Test
    public void testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.exactlyOnceSupport((Map) ArgumentMatchers.eq(hashMap))).thenThrow(new Throwable[]{new NullPointerException("time to add a new unit test :)")});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages();
        Assertions.assertFalse(errorMessages.isEmpty());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("time to add a new unit test :)"), "Error message did not contain expected text: " + ((String) errorMessages.get(0)));
        Assertions.assertEquals(1, errorMessages.size());
    }

    @Test
    public void testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() {
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.exactlyOnceSupport((Map) ArgumentMatchers.eq(hashMap))).thenReturn(ExactlyOnceSupport.SUPPORTED);
        Assertions.assertEquals(Collections.singletonList("This worker does not have exactly-once source support enabled."), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages());
    }

    @Test
    public void testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("exactly.once.support", "invalid");
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig((SourceConnector) Mockito.mock(SourceConnector.class), SourceConnectorConfig.configDef(), hashMap).get("exactly.once.support")).errorMessages();
        Assertions.assertFalse(errorMessages.isEmpty());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("String must be one of (case insensitive): "), "Error message did not contain expected text: " + ((String) errorMessages.get(0)));
        Assertions.assertEquals(1, errorMessages.size());
    }

    @Test
    public void testConnectorTransactionBoundaryValidation() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.canDefineTransactionBoundaries((Map) ArgumentMatchers.eq(hashMap))).thenReturn(ConnectorTransactionBoundaries.SUPPORTED);
        Assertions.assertEquals(Collections.emptyList(), ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages());
    }

    @Test
    public void testConnectorTransactionBoundaryValidationOnUnsupportedConnector() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.canDefineTransactionBoundaries((Map) ArgumentMatchers.eq(hashMap))).thenReturn(ConnectorTransactionBoundaries.UNSUPPORTED);
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages();
        Assertions.assertFalse(errorMessages.isEmpty());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("The connector does not support connector-defined transaction boundaries with the given configuration."), "Error message did not contain expected text: " + ((String) errorMessages.get(0)));
        Assertions.assertEquals(1, errorMessages.size());
    }

    @Test
    public void testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", SourceTask.TransactionBoundary.CONNECTOR.toString());
        SourceConnector sourceConnector = (SourceConnector) Mockito.mock(SourceConnector.class);
        Mockito.when(sourceConnector.canDefineTransactionBoundaries((Map) ArgumentMatchers.eq(hashMap))).thenThrow(new Throwable[]{new ConnectException("Wait I thought we tested for this?")});
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig(sourceConnector, SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages();
        Assertions.assertFalse(errorMessages.isEmpty());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("Wait I thought we tested for this?"), "Error message did not contain expected text: " + ((String) errorMessages.get(0)));
        Assertions.assertEquals(1, errorMessages.size());
    }

    @Test
    public void testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() {
        this.herder = exactlyOnceHerder();
        HashMap hashMap = new HashMap();
        hashMap.put("transaction.boundary", "CONNECTOR.toString()");
        List errorMessages = ((ConfigValue) this.herder.validateSourceConnectorConfig((SourceConnector) Mockito.mock(SourceConnector.class), SourceConnectorConfig.configDef(), hashMap).get("transaction.boundary")).errorMessages();
        Assertions.assertFalse(errorMessages.isEmpty());
        Assertions.assertTrue(((String) errorMessages.get(0)).contains("String must be one of (case insensitive): "), "Error message did not contain expected text: " + ((String) errorMessages.get(0)));
        Assertions.assertEquals(1, errorMessages.size());
    }

    @Test
    public void testConnectorOffsets() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        Mockito.when(this.statusBackingStore.connectors()).thenReturn(Collections.emptySet());
        expectMemberPoll();
        this.herder.tick();
        Mockito.when(this.configBackingStore.snapshot()).thenReturn(SNAPSHOT);
        ConnectorOffsets connectorOffsets = new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"))));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, connectorOffsets);
            return null;
        }).when(this.worker)).connectorOffsets((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG), (Callback) forClass.capture());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorOffsets(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(connectorOffsets, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Mockito.verifyNoMoreInteractions(new Object[]{this.worker, this.member, this.configBackingStore, this.statusBackingStore});
    }

    @Test
    public void testModifyConnectorOffsetsUnknownConnector() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.modifyConnectorOffsets("connector-does-not-exist", new HashMap(), futureCallback);
        this.herder.tick();
        Assertions.assertInstanceOf(NotFoundException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause());
    }

    @Test
    public void testModifyOffsetsConnectorNotInStoppedState() {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.modifyConnectorOffsets(CONN1, (Map) null, futureCallback);
        this.herder.tick();
        Assertions.assertInstanceOf(BadRequestException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause());
    }

    @Test
    public void testModifyOffsetsNotLeader() {
        Mockito.when(this.member.memberId()).thenReturn("member");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), false);
        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
        this.herder.tick();
        FutureCallback futureCallback = new FutureCallback();
        this.herder.modifyConnectorOffsets(CONN1, new HashMap(), futureCallback);
        this.herder.tick();
        Assertions.assertInstanceOf(NotLeaderException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        })).getCause());
    }

    @Test
    public void testModifyOffsetsSinkConnector() throws Exception {
        Mockito.when(this.herder.connectorType((Map) ArgumentMatchers.any())).thenReturn(ConnectorType.SINK);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
        this.herder.tick();
        Map singletonMap = Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        Message message = new Message("The offsets for this connector have been altered successfully");
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, message);
            return null;
        }).when(this.worker)).modifyConnectorOffsets((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG), (Map) ArgumentMatchers.eq(singletonMap), (Callback) forClass.capture());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.alterConnectorOffsets(CONN1, singletonMap, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(message, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals("The offsets for this connector have been altered successfully", message.message());
    }

    @Test
    public void testModifyOffsetsSourceConnectorExactlyOnceDisabled() throws Exception {
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        Mockito.when(this.herder.connectorType(ArgumentMatchers.anyMap())).thenReturn(ConnectorType.SOURCE);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
        this.herder.tick();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        Message message = new Message("The offsets for this connector have been reset successfully");
        ((Worker) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion((Throwable) null, message);
            return null;
        }).when(this.worker)).modifyConnectorOffsets((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG), (Map) ArgumentMatchers.isNull(), (Callback) forClass.capture());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.resetConnectorOffsets(CONN1, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(message, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals("The offsets for this connector have been reset successfully", message.message());
    }

    @Test
    public void testModifyOffsetsSourceConnectorExactlyOnceEnabled() throws Exception {
        this.herder = exactlyOnceHerder();
        DistributedHerder distributedHerder = this.herder;
        Objects.requireNonNull(distributedHerder);
        this.rebalanceListener = new DistributedHerder.RebalanceListener(distributedHerder, this.time);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.herder.connectorType((Map) ArgumentMatchers.any())).thenReturn(ConnectorType.SOURCE);
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(this.worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1).intValue(), CONN1_CONFIG)).thenReturn(kafkaFuture);
        Mockito.when(kafkaFuture.thenApply((KafkaFuture.BaseFunction) ArgumentMatchers.any(KafkaFuture.BaseFunction.class))).thenReturn(kafkaFuture2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
        Mockito.when(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) forClass.capture())).thenAnswer(invocationOnMock -> {
            ((KafkaFuture.BiConsumer) forClass.getValue()).accept((Object) null, (Object) null);
            return null;
        });
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        Message message = new Message("The offsets for this connector have been altered successfully");
        Map singletonMap = Collections.singletonMap(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
        ((Worker) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, message);
            return null;
        }).when(this.worker)).modifyConnectorOffsets((String) ArgumentMatchers.eq(CONN1), (Map) ArgumentMatchers.eq(CONN1_CONFIG), (Map) ArgumentMatchers.eq(singletonMap), (Callback) forClass2.capture());
        FutureCallback futureCallback = new FutureCallback();
        this.herder.alterConnectorOffsets(CONN1, singletonMap, futureCallback);
        this.herder.tick();
        Assertions.assertEquals(message, futureCallback.get(1000L, TimeUnit.MILLISECONDS));
        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.alterConnectorOffsets(CONN1, singletonMap, futureCallback2);
        this.herder.tick();
        Assertions.assertEquals(message, futureCallback2.get(1000L, TimeUnit.MILLISECONDS));
        ((KafkaFuture) Mockito.verify(kafkaFuture2, Mockito.times(2))).whenComplete((KafkaFuture.BiConsumer) ArgumentMatchers.any());
        ((Worker) Mockito.verify(this.worker, Mockito.times(1))).fenceZombies((String) ArgumentMatchers.eq(CONN1), ((Integer) ArgumentMatchers.eq(SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1))).intValue(), (Map) ArgumentMatchers.eq(CONN1_CONFIG));
        Mockito.verifyNoMoreInteractions(new Object[]{kafkaFuture, kafkaFuture2, this.member, this.worker});
    }

    @Test
    public void testModifyOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() {
        this.herder = exactlyOnceHerder();
        DistributedHerder distributedHerder = this.herder;
        Objects.requireNonNull(distributedHerder);
        this.rebalanceListener = new DistributedHerder.RebalanceListener(distributedHerder, this.time);
        Mockito.when(this.member.memberId()).thenReturn("leader");
        Mockito.when(Short.valueOf(this.member.currentProtocolVersion())).thenReturn((short) 0);
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList(), true);
        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
        expectMemberPoll();
        this.herder.tick();
        expectMemberEnsureActive();
        Mockito.when(this.herder.connectorType((Map) ArgumentMatchers.any())).thenReturn(ConnectorType.SOURCE);
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        KafkaFuture kafkaFuture2 = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(this.worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1).intValue(), CONN1_CONFIG)).thenReturn(kafkaFuture);
        Mockito.when(kafkaFuture.thenApply((KafkaFuture.BaseFunction) ArgumentMatchers.any(KafkaFuture.BaseFunction.class))).thenReturn(kafkaFuture2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
        Mockito.when(kafkaFuture2.whenComplete((KafkaFuture.BiConsumer) forClass.capture())).thenAnswer(invocationOnMock -> {
            ((KafkaFuture.BiConsumer) forClass.getValue()).accept((Object) null, new ConnectException("Failed to perform zombie fencing"));
            return null;
        });
        FutureCallback futureCallback = new FutureCallback();
        this.herder.resetConnectorOffsets(CONN1, futureCallback);
        this.herder.tick();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            futureCallback.get(1000L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertEquals(ConnectException.class, executionException.getCause().getClass());
        Assertions.assertEquals("Failed to perform zombie fencing for source connector prior to modifying offsets", executionException.getCause().getMessage());
        ((KafkaFuture) Mockito.verify(kafkaFuture2, Mockito.times(2))).whenComplete((KafkaFuture.BiConsumer) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{kafkaFuture, kafkaFuture2, this.member, this.worker});
    }

    private void expectMemberPoll() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Supplier.class);
        ((WorkerGroupMember) Mockito.doAnswer(invocationOnMock -> {
            ((Utils.UncheckedCloseable) ((Supplier) forClass.getValue()).get()).close();
            return null;
        }).when(this.member)).poll(ArgumentMatchers.anyLong(), (Supplier) forClass.capture());
    }

    private void expectMemberEnsureActive() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Supplier.class);
        ((WorkerGroupMember) Mockito.doAnswer(invocationOnMock -> {
            ((Utils.UncheckedCloseable) ((Supplier) forClass.getValue()).get()).close();
            return null;
        }).when(this.member)).ensureActive((Supplier) forClass.capture());
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2) {
        expectRebalance(j, list, list2, false);
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2, boolean z) {
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, j, "leader", "leaderUrl", list, list2, 0, z);
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2, String str, String str2, boolean z) {
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, j, str, str2, list, list2, 0, z);
    }

    private void expectRebalance(Collection<String> collection, List<ConnectorTaskId> list, short s, long j, List<String> list2, List<ConnectorTaskId> list3) {
        expectRebalance(collection, list, s, j, list2, list3, 0);
    }

    private void expectRebalance(Collection<String> collection, List<ConnectorTaskId> list, short s, long j, List<String> list2, List<ConnectorTaskId> list3, int i) {
        expectRebalance(collection, list, s, j, "leader", "leaderUrl", list2, list3, i, false);
    }

    private void expectRebalance(Collection<String> collection, List<ConnectorTaskId> list, short s, long j, String str, String str2, List<String> list2, List<ConnectorTaskId> list3, int i, boolean z) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Supplier.class);
        ((WorkerGroupMember) Mockito.doAnswer(invocationOnMock -> {
            ((Utils.UncheckedCloseable) ((Supplier) forClass.getValue()).get()).close();
            if (!collection.isEmpty() || !list.isEmpty()) {
                this.rebalanceListener.onRevoked(str, collection, list);
            }
            this.rebalanceListener.onAssigned(this.connectProtocolVersion == 0 ? new ExtendedAssignment(this.connectProtocolVersion, s, str, str2, j, list2, list3, Collections.emptyList(), Collections.emptyList(), 0) : new ExtendedAssignment(this.connectProtocolVersion, s, str, str2, j, list2, list3, new ArrayList(collection), new ArrayList(list), i), 3);
            this.time.sleep(100L);
            return null;
        }).when(this.member)).ensureActive((Supplier) forClass.capture());
        if (z) {
            ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).claimWritePrivileges();
        }
        if (!collection.isEmpty()) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnector(it.next());
            }
        }
        if (!list.isEmpty()) {
            ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTask((ConnectorTaskId) ArgumentMatchers.any(ConnectorTaskId.class));
        }
        if (!collection.isEmpty()) {
            ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).flush();
        }
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).wakeup();
    }

    private ClusterConfigState exactlyOnceSnapshot(SessionKey sessionKey, Map<ConnectorTaskId, Map<String, String>> map, Map<String, Integer> map2, Map<String, Integer> map3, Set<String> set) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(map2.keySet());
        hashSet.addAll(map3.keySet());
        hashSet.addAll(set);
        return exactlyOnceSnapshot(sessionKey, map, map2, map3, set, (Map) hashSet.stream().collect(Collectors.toMap(Function.identity(), str -> {
            return 1;
        })));
    }

    private ClusterConfigState exactlyOnceSnapshot(SessionKey sessionKey, Map<ConnectorTaskId, Map<String, String>> map, Map<String, Integer> map2, Map<String, Integer> map3, Set<String> set, Map<String, Integer> map4) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(map4.keySet());
        hashSet.addAll(map2.keySet());
        hashSet.addAll(map3.keySet());
        hashSet.addAll(set);
        Map map5 = (Map) hashSet.stream().collect(Collectors.toMap(Function.identity(), str -> {
            return CONN1_CONFIG;
        }));
        Stream distinct = map.keySet().stream().map((v0) -> {
            return v0.connector();
        }).distinct();
        Objects.requireNonNull(map5);
        return new ClusterConfigState(1L, sessionKey, map4, map5, Collections.singletonMap(CONN1, TargetState.STARTED), map, map2, map3, (Map) distinct.filter((v1) -> {
            return r1.containsKey(v1);
        }).collect(Collectors.toMap(Function.identity(), str2 -> {
            return new AppliedConnectorConfig((Map) map5.get(str2));
        })), set, Collections.emptySet());
    }

    private void expectExecuteTaskReconfiguration(boolean z, ConnectorConfig connectorConfig, Answer<List<Map<String, String>>> answer) {
        Mockito.when(Boolean.valueOf(this.worker.isRunning(CONN1))).thenReturn(Boolean.valueOf(z));
        if (z) {
            Mockito.when(this.worker.getPlugins()).thenReturn(this.plugins);
            Mockito.when(this.worker.connectorTaskConfigs(CONN1, connectorConfig)).thenAnswer(answer);
        }
    }

    private SessionKey expectNewSessionKey() {
        SecretKey secretKey = (SecretKey) Mockito.mock(SecretKey.class);
        Mockito.when(secretKey.getAlgorithm()).thenReturn("HmacSHA256");
        Mockito.when(secretKey.getEncoded()).thenReturn(new byte[32]);
        return new SessionKey(secretKey, this.time.milliseconds() + TimeUnit.DAYS.toMillis(1L));
    }

    private void expectConfigRefreshAndSnapshot(ClusterConfigState clusterConfigState) {
        try {
            ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).refresh(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
            Mockito.when(this.configBackingStore.snapshot()).thenReturn(clusterConfigState);
        } catch (TimeoutException e) {
            Assertions.fail("Mocked method should not throw checked exception");
        }
    }

    private void startBackgroundHerder() {
        this.herderExecutor = Executors.newSingleThreadExecutor();
        this.herderFuture = this.herderExecutor.submit((Runnable) this.herder);
    }

    private void stopBackgroundHerder() throws Exception {
        this.herder.stop();
        this.herderExecutor.shutdown();
        Assertions.assertTrue(this.herderExecutor.awaitTermination(10L, TimeUnit.SECONDS), "herder thread did not finish in time");
        this.herderFuture.get();
        Assertions.assertTrue(this.noneConnectorClientConfigOverridePolicy.isClosed());
    }

    private void expectHerderStartup() {
        ((Worker) Mockito.doNothing().when(this.worker)).start();
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).start();
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).start();
    }

    private void expectHerderShutdown() {
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitConnectors();
        ((Worker) Mockito.doNothing().when(this.worker)).stopAndAwaitTasks();
        ((WorkerGroupMember) Mockito.doNothing().when(this.member)).stop();
        ((StatusBackingStore) Mockito.doNothing().when(this.statusBackingStore)).stop();
        ((ConfigBackingStore) Mockito.doNothing().when(this.configBackingStore)).stop();
        ((Worker) Mockito.doNothing().when(this.worker)).stop();
    }

    private void assertStatistics(int i, int i2, double d, double d2) {
        assertStatistics(i2 <= 0 ? null : "leaderUrl", false, i, i2, d, d2);
    }

    private void assertStatistics(String str, boolean z, int i, int i2, double d, double d2) {
        ConnectMetrics.MetricGroup metricGroup = this.herder.herderMetrics().metricGroup();
        double currentMetricValueAsDouble = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "epoch");
        String currentMetricValueAsString = MockConnectMetrics.currentMetricValueAsString(this.metrics, metricGroup, "leader-name");
        double currentMetricValueAsDouble2 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "completed-rebalances-total");
        double currentMetricValueAsDouble3 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "rebalancing");
        double currentMetricValueAsDouble4 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "rebalance-max-time-ms");
        double currentMetricValueAsDouble5 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "rebalance-avg-time-ms");
        double currentMetricValueAsDouble6 = MockConnectMetrics.currentMetricValueAsDouble(this.metrics, metricGroup, "time-since-last-rebalance-ms");
        Assertions.assertEquals(i, currentMetricValueAsDouble, 1.0E-4d);
        Assertions.assertEquals(str, currentMetricValueAsString);
        Assertions.assertEquals(i2, currentMetricValueAsDouble2, 1.0E-4d);
        Assertions.assertEquals(z ? 1.0d : 0.0d, currentMetricValueAsDouble3, 1.0E-4d);
        Assertions.assertEquals(d2, currentMetricValueAsDouble6, 1.0E-4d);
        if (d <= 0.0d) {
            Assertions.assertEquals(Double.NaN, currentMetricValueAsDouble4, 1.0E-4d);
            Assertions.assertEquals(Double.NaN, currentMetricValueAsDouble5, 1.0E-4d);
        } else {
            Assertions.assertEquals(d, currentMetricValueAsDouble4, 1.0E-4d);
            Assertions.assertEquals(d, currentMetricValueAsDouble5, 1.0E-4d);
        }
    }

    private static List<String> expectRecordStages(Callback<?> callback) {
        Mockito.when(callback.chainStaging((Callback) ArgumentMatchers.any())).thenCallRealMethod();
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        ((Callback) Mockito.doAnswer(invocationOnMock -> {
            Stage stage = (Stage) invocationOnMock.getArgument(0);
            if (stage == null) {
                return null;
            }
            synchronizedList.add(stage.description());
            return null;
        }).when(callback)).recordStage((Stage) ArgumentMatchers.any());
        return synchronizedList;
    }

    private DistributedHerder exactlyOnceHerder() {
        HashMap hashMap = new HashMap(HERDER_CONFIG);
        hashMap.put("exactly.once.source.support", "enabled");
        return (DistributedHerder) Mockito.mock(DistributedHerder.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS).useConstructor(new Object[]{new DistributedConfig(hashMap), this.worker, WORKER_ID, KAFKA_CLUSTER_ID, this.statusBackingStore, this.configBackingStore, this.member, MEMBER_URL, this.restClient, this.metrics, this.time, this.noneConnectorClientConfigOverridePolicy, Collections.emptyList(), null, new AutoCloseable[0]}));
    }

    static {
        HERDER_CONFIG.put("status.storage.topic", "status-topic");
        HERDER_CONFIG.put("config.storage.topic", "config-topic");
        HERDER_CONFIG.put("bootstrap.servers", "localhost:9092");
        HERDER_CONFIG.put("group.id", "connect-test-group");
        HERDER_CONFIG.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("offset.storage.topic", "connect-offsets");
        TASK0 = new ConnectorTaskId(CONN1, 0);
        TASK1 = new ConnectorTaskId(CONN1, 1);
        TASK2 = new ConnectorTaskId(CONN1, 2);
        MAX_TASKS = 3;
        CONN1_CONFIG = new HashMap();
        CONN1_CONFIG.put("name", CONN1);
        CONN1_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN1_CONFIG.put("topics", String.join(",", FOO_TOPIC, BAR_TOPIC));
        CONN1_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        CONN1_CONFIG_UPDATED = new HashMap(CONN1_CONFIG);
        CONN1_CONFIG_UPDATED.put("topics", String.join(",", FOO_TOPIC, BAR_TOPIC, BAZ_TOPIC));
        CONN1_CONFIG_INFOS = new ConfigInfos(CONN1, 0, Collections.emptyList(), Collections.emptyList());
        CONN2_CONFIG = new HashMap();
        CONN2_CONFIG.put("name", CONN2);
        CONN2_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN2_CONFIG.put("topics", String.join(",", FOO_TOPIC, BAR_TOPIC));
        CONN2_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        CONN2_CONFIG_INFOS = new ConfigInfos(CONN2, 0, Collections.emptyList(), Collections.emptyList());
        CONN2_INVALID_CONFIG_INFOS = new ConfigInfos(CONN2, 1, Collections.emptyList(), Collections.emptyList());
        TASK_CONFIG = new HashMap();
        TASK_CONFIG.put("task.class", BogusSourceTask.class.getName());
        TASK_CONFIGS = new ArrayList();
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS_MAP = new HashMap<>();
        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
        SNAPSHOT = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet());
        SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet());
        SNAPSHOT_STOPPED_CONN1 = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 0), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STOPPED), Collections.emptyMap(), Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, 10), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.singleton(CONN1), Collections.emptySet());
        SNAPSHOT_STOPPED_CONN1_FENCED = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 0), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STOPPED), Collections.emptyMap(), Collections.singletonMap(CONN1, 0), Collections.singletonMap(CONN1, 11), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet());
        SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1L, (SessionKey) null, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG_UPDATED)), Collections.emptySet(), Collections.emptySet());
        EMPTY_RUNNABLE = () -> {
        };
    }
}
