package org.apache.kafka.connect.storage;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.class */
public class KafkaConfigBackingStoreTest {
    private static final String CLIENT_ID_BASE = "test-client-id-";
    private static final String TOPIC = "connect-configs";
    private static final short TOPIC_REPLICATION_FACTOR = 5;
    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap();
    private static final List<String> CONNECTOR_IDS;
    private static final List<String> CONNECTOR_CONFIG_KEYS;
    private static final List<String> COMMIT_TASKS_CONFIG_KEYS;
    private static final List<String> TARGET_STATE_KEYS;
    private static final List<String> CONNECTOR_TASK_COUNT_RECORD_KEYS;
    private static final String CONNECTOR_1_NAME = "connector1";
    private static final String CONNECTOR_2_NAME = "connector2";
    private static final List<String> RESTART_CONNECTOR_KEYS;
    private static final List<ConnectorTaskId> TASK_IDS;
    private static final List<String> TASK_CONFIG_KEYS;
    private static final List<Map<String, String>> SAMPLE_CONFIGS;
    private static final List<Struct> TASK_CONFIG_STRUCTS;
    private static final Struct ONLY_FAILED_MISSING_STRUCT;
    private static final Struct INCLUDE_TASKS_MISSING_STRUCT;
    private static final List<Struct> RESTART_REQUEST_STRUCTS;
    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS;
    private static final Struct TARGET_STATE_PAUSED;
    private static final Struct TARGET_STATE_PAUSED_LEGACY;
    private static final Struct TARGET_STATE_STOPPED;
    private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS;
    private static final List<byte[]> CONFIGS_SERIALIZED;
    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR;
    private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR;
    private static final List<byte[]> TARGET_STATES_SERIALIZED;

    @Mock
    private Converter converter;

    @Mock
    private ConfigBackingStore.UpdateListener configUpdateListener;
    private DistributedConfig config;

    @Mock
    KafkaBasedLog<String, byte[]> configLog;

    @Mock
    Producer<String, byte[]> fencableProducer;

    @Mock
    Future<RecordMetadata> producerFuture;
    private KafkaConfigBackingStore configStorage;
    private Map<String, String> props = new HashMap(DEFAULT_CONFIG_STORAGE_PROPS);
    private final ArgumentCaptor<String> capturedTopic = ArgumentCaptor.forClass(String.class);
    private final ArgumentCaptor<Map<String, Object>> capturedConsumerProps = ArgumentCaptor.forClass(Map.class);
    private final ArgumentCaptor<Map<String, Object>> capturedProducerProps = ArgumentCaptor.forClass(Map.class);
    private final ArgumentCaptor<Supplier<TopicAdmin>> capturedAdminSupplier = ArgumentCaptor.forClass(Supplier.class);
    private final ArgumentCaptor<NewTopic> capturedNewTopic = ArgumentCaptor.forClass(NewTopic.class);
    private final ArgumentCaptor<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = ArgumentCaptor.forClass(Callback.class);
    private final MockTime time = new MockTime();
    private long logOffset = 0;

    private void createStore() {
        this.config = (DistributedConfig) Mockito.spy(new DistributedConfig(this.props));
        ((DistributedConfig) Mockito.doReturn("test-cluster").when(this.config)).kafkaClusterId();
        this.configStorage = (KafkaConfigBackingStore) Mockito.spy(new KafkaConfigBackingStore(this.converter, this.config, (WorkerConfigTransformer) null, () -> {
            return null;
        }, CLIENT_ID_BASE, this.time));
        this.configStorage.setConfigLog(this.configLog);
        this.configStorage.setUpdateListener(this.configUpdateListener);
    }

    @BeforeEach
    public void setUp() {
        createStore();
    }

    @Test
    public void testStartStop() {
        this.props.put("config.storage.min.insync.replicas", "3");
        this.props.put("config.storage.max.message.bytes", "1001");
        createStore();
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        Assertions.assertEquals(TOPIC, this.capturedTopic.getValue());
        Assertions.assertEquals("org.apache.kafka.common.serialization.StringSerializer", ((Map) this.capturedProducerProps.getValue()).get("key.serializer"));
        Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("value.serializer"));
        Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("key.deserializer"));
        Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("value.deserializer"));
        Assertions.assertEquals(TOPIC, ((NewTopic) this.capturedNewTopic.getValue()).name());
        Assertions.assertEquals(1, ((NewTopic) this.capturedNewTopic.getValue()).numPartitions());
        Assertions.assertEquals((short) 5, ((NewTopic) this.capturedNewTopic.getValue()).replicationFactor());
        Assertions.assertEquals("3", ((NewTopic) this.capturedNewTopic.getValue()).configs().get("min.insync.replicas"));
        Assertions.assertEquals("1001", ((NewTopic) this.capturedNewTopic.getValue()).configs().get("max.message.bytes"));
        this.configStorage.start();
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testSnapshotCannotMutateInternalState() {
        this.props.put("config.storage.min.insync.replicas", "3");
        this.props.put("config.storage.max.message.bytes", "1001");
        createStore();
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertNotSame(snapshot.connectorTaskCounts, this.configStorage.connectorTaskCounts);
        Assertions.assertNotSame(snapshot.connectorConfigs, this.configStorage.connectorConfigs);
        Assertions.assertNotSame(snapshot.connectorTargetStates, this.configStorage.connectorTargetStates);
        Assertions.assertNotSame(snapshot.taskConfigs, this.configStorage.taskConfigs);
        Assertions.assertNotSame(snapshot.connectorTaskCountRecords, this.configStorage.connectorTaskCountRecords);
        Assertions.assertNotSame(snapshot.connectorTaskConfigGenerations, this.configStorage.connectorTaskConfigGenerations);
        Assertions.assertNotSame(snapshot.connectorsPendingFencing, this.configStorage.connectorsPendingFencing);
        Assertions.assertNotSame(snapshot.inconsistentConnectors, this.configStorage.inconsistent);
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(-1L, snapshot.offset());
        Assertions.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(1)));
        final String str = CONNECTOR_CONFIG_KEYS.get(1);
        final String str2 = TARGET_STATE_KEYS.get(1);
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))).doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1)))).doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.1
            {
                put(str, null);
                put(str2, null);
            }
        })).when(this.configLog)).readToEnd();
        expectConvertWriteRead(CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), (TargetState) null);
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(1L, snapshot2.offset());
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertNull(snapshot2.connectorConfig(CONNECTOR_IDS.get(1)));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
        expectConvertWriteRead(CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), (TargetState) null);
        ClusterConfigState snapshot3 = this.configStorage.snapshot();
        Assertions.assertEquals(2L, snapshot3.offset());
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot3.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(1), snapshot3.connectorConfig(CONNECTOR_IDS.get(1)));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
        Mockito.when(this.producerFuture.get(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenReturn((Object) null);
        Mockito.when(this.converter.toConnectData(TOPIC, (byte[]) null)).thenReturn(new SchemaAndValue((Schema) null, (Object) null));
        Mockito.when(this.configLog.sendWithReceipt((String) AdditionalMatchers.or((String) Mockito.eq(str), (String) Mockito.eq(str2)), (byte[]) Mockito.isNull())).thenReturn(this.producerFuture);
        this.configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
        ClusterConfigState snapshot4 = this.configStorage.snapshot();
        Assertions.assertEquals(4L, snapshot4.offset());
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot4.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertNull(snapshot4.connectorConfig(CONNECTOR_IDS.get(1)));
        Assertions.assertNull(snapshot4.targetState(CONNECTOR_IDS.get(1)));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorConfigRemove(CONNECTOR_IDS.get(1));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutConnectorConfigWithTargetState() {
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(-1L, snapshot.offset());
        Assertions.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertNull(snapshot.targetState(CONNECTOR_IDS.get(0)));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.2
            {
                put(KafkaConfigBackingStoreTest.TARGET_STATE_KEYS.get(0), KafkaConfigBackingStoreTest.TARGET_STATES_SERIALIZED.get(2));
                put(KafkaConfigBackingStoreTest.CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(0));
            }
        })).when(this.configLog)).readToEnd();
        expectConvertWriteRead(TARGET_STATE_KEYS.get(0), KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATES_SERIALIZED.get(2), "state.v2", TargetState.STOPPED.name());
        expectConvertWriteRead(CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(2L, snapshot2.offset());
        Assertions.assertEquals(TargetState.STOPPED, snapshot2.targetState(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.connectorConfig(CONNECTOR_IDS.get(0)));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onConnectorTargetStateChange(ArgumentMatchers.anyString());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutConnectorConfigProducerError() throws Exception {
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        Mockito.when(this.converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))).thenReturn(CONFIGS_SERIALIZED.get(0));
        Mockito.when(this.configLog.sendWithReceipt(ArgumentMatchers.anyString(), (byte[]) ArgumentMatchers.any(byte[].class))).thenReturn(this.producerFuture);
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(-1L, snapshot.offset());
        Assertions.assertEquals(0, snapshot.connectors().size());
        ExecutionException executionException = new ExecutionException((Throwable) new TopicAuthorizationException(Collections.singleton("test")));
        Mockito.when(this.producerFuture.get(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenThrow(new Throwable[]{executionException});
        ConnectException assertThrows = Assertions.assertThrows(ConnectException.class, () -> {
            this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), (TargetState) null);
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("Error writing connector configuration to Kafka"));
        Assertions.assertEquals(executionException, assertThrows.getCause());
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRemoveConnectorConfigSlowProducer() throws Exception {
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        Future future = (Future) Mockito.mock(Future.class);
        Future future2 = (Future) Mockito.mock(Future.class);
        Mockito.when(this.configLog.sendWithReceipt(ArgumentMatchers.anyString(), (byte[]) ArgumentMatchers.isNull())).thenReturn(future).thenReturn(future2);
        Mockito.when((RecordMetadata) future.get(ArgumentMatchers.eq(30000L), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            this.time.sleep(29000L);
            return null;
        });
        Mockito.when((RecordMetadata) future2.get(ArgumentMatchers.eq(1000L), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock2 -> {
            this.time.sleep(1000L);
            return null;
        });
        Future future3 = (Future) Mockito.mock(Future.class);
        Mockito.when(this.configLog.readToEnd()).thenReturn(future3);
        Mockito.when((Void) future3.get(ArgumentMatchers.eq(0L), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenReturn((Object) null);
        this.configStorage.removeConnectorConfig("test-connector");
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testWritePrivileges() throws Exception {
        this.props.put("exactly.once.source.support", "preparing");
        createStore();
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        Mockito.when(this.converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))).thenReturn(CONFIGS_SERIALIZED.get(0));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
        });
        ((KafkaConfigBackingStore) Mockito.doReturn(this.fencableProducer).when(this.configStorage)).createFencableProducer();
        Mockito.when(this.fencableProducer.send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class))).thenReturn((Object) null);
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))).doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)))).when(this.configLog)).readToEnd();
        Mockito.when(this.converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0))).thenReturn(new SchemaAndValue((Schema) null, structToMap(CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))));
        this.configStorage.claimWritePrivileges();
        this.configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
        ((Producer) Mockito.verify(this.fencableProducer)).beginTransaction();
        ((Producer) Mockito.verify(this.fencableProducer)).commitTransaction();
        Mockito.when(this.converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))).thenReturn(CONFIGS_SERIALIZED.get(1));
        ((Producer) Mockito.doThrow(new Throwable[]{new ProducerFencedException("Better luck next time")}).doNothing().when(this.fencableProducer)).commitTransaction();
        Assertions.assertThrows(PrivilegedWriteException.class, () -> {
            this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), (TargetState) null);
        });
        ((Producer) Mockito.verify(this.fencableProducer, Mockito.times(2))).beginTransaction();
        ((Producer) Mockito.verify(this.fencableProducer)).close(Duration.ZERO);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), (TargetState) null);
        });
        Mockito.when(this.converter.fromConnectData(TOPIC, KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED)).thenReturn(CONFIGS_SERIALIZED.get(1));
        Mockito.when(this.configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1))).thenReturn(this.producerFuture);
        Mockito.when(this.producerFuture.get(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenReturn((Object) null);
        this.configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
        Mockito.when(this.converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2))).thenReturn(new SchemaAndValue((Schema) null, structToMap(CONNECTOR_CONFIG_STRUCTS.get(0))));
        this.configStorage.claimWritePrivileges();
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), (TargetState) null);
        ((Producer) Mockito.verify(this.fencableProducer, Mockito.times(3))).beginTransaction();
        ((Producer) Mockito.verify(this.fencableProducer, Mockito.times(3))).commitTransaction();
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage, Mockito.times(2))).createFencableProducer();
        ((Producer) Mockito.verify(this.fencableProducer, Mockito.times(2))).close(Duration.ZERO);
    }

    @Test
    public void testRestoreTargetStateUnexpectedDeletion() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(5L, snapshot.offset());
        Assertions.assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assertions.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRestoreTargetState() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED_LEGACY);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TARGET_STATE_STOPPED);
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(6L, snapshot.offset());
        Assertions.assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assertions.assertEquals(TargetState.PAUSED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(TargetState.STOPPED, snapshot.targetState(CONNECTOR_IDS.get(1)));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRestore() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 6L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1), CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 7L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 8L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(8), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), CONNECTOR_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(6), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(7), CONNECTOR_CONFIG_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(8), TASK_CONFIG_STRUCTS.get(1));
        this.logOffset = 9L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(this.logOffset, snapshot.offset());
        Assertions.assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assertions.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(2), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(TASK_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(TASK_IDS.get(1)));
        Assertions.assertEquals(9, snapshot.taskCountRecord(CONNECTOR_IDS.get(1)).intValue());
        Assertions.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        Assertions.assertEquals(Collections.singleton(CONNECTOR_1_NAME), snapshot.connectorsPendingFencing);
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRestoreConnectorDeletion() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(6L, snapshot.offset());
        Assertions.assertTrue(snapshot.connectors().isEmpty());
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRestoreZeroTasks() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 6L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 7L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), CONNECTOR_CONFIG_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
        this.logOffset = 8L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(8L, snapshot.offset());
        Assertions.assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(2), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(Collections.emptyList(), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRecordToRestartRequest() {
        ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
        Struct struct = RESTART_REQUEST_STRUCTS.get(0);
        RestartRequest recordToRestartRequest = this.configStorage.recordToRestartRequest(consumerRecord, new SchemaAndValue(struct.schema(), structToMap(struct)));
        Assertions.assertEquals(CONNECTOR_1_NAME, recordToRestartRequest.connectorName());
        Assertions.assertEquals(struct.getBoolean("include-tasks"), Boolean.valueOf(recordToRestartRequest.includeTasks()));
        Assertions.assertEquals(struct.getBoolean("only-failed"), Boolean.valueOf(recordToRestartRequest.onlyFailed()));
    }

    @Test
    public void testRecordToRestartRequestOnlyFailedInconsistent() {
        ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
        Struct struct = ONLY_FAILED_MISSING_STRUCT;
        RestartRequest recordToRestartRequest = this.configStorage.recordToRestartRequest(consumerRecord, new SchemaAndValue(struct.schema(), structToMap(struct)));
        Assertions.assertEquals(CONNECTOR_1_NAME, recordToRestartRequest.connectorName());
        Assertions.assertEquals(struct.getBoolean("include-tasks"), Boolean.valueOf(recordToRestartRequest.includeTasks()));
        Assertions.assertFalse(recordToRestartRequest.onlyFailed());
    }

    @Test
    public void testRecordToRestartRequestIncludeTasksInconsistent() {
        ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
        Struct struct = INCLUDE_TASKS_MISSING_STRUCT;
        RestartRequest recordToRestartRequest = this.configStorage.recordToRestartRequest(consumerRecord, new SchemaAndValue(struct.schema(), structToMap(struct)));
        Assertions.assertEquals(CONNECTOR_1_NAME, recordToRestartRequest.connectorName());
        Assertions.assertFalse(recordToRestartRequest.includeTasks());
        Assertions.assertEquals(struct.getBoolean("only-failed"), Boolean.valueOf(recordToRestartRequest.onlyFailed()));
    }

    @Test
    public void testFencableProducerPropertiesOverrideUserSuppliedValues() {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.put("group.id", "my-other-connect-cluster");
        this.props.put("transactional.id", "my-custom-transactional-id");
        this.props.put("enable.idempotence", "false");
        createStore();
        Map fencableProducerProps = this.configStorage.fencableProducerProps(this.config);
        Assertions.assertEquals("connect-cluster-" + "my-other-connect-cluster", fencableProducerProps.get("transactional.id"));
        Assertions.assertEquals("true", fencableProducerProps.get("enable.idempotence"));
    }

    @Test
    public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.toString());
        createStore();
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        Assertions.assertEquals(IsolationLevel.READ_UNCOMMITTED.toString(), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testClientIds() {
        this.props = new HashMap(DEFAULT_CONFIG_STORAGE_PROPS);
        this.props.put("exactly.once.source.support", "enabled");
        createStore();
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        Map fencableProducerProps = this.configStorage.fencableProducerProps(this.config);
        Assertions.assertEquals("test-client-id-configs", ((Map) this.capturedProducerProps.getValue()).get("client.id"));
        Assertions.assertEquals("test-client-id-configs", ((Map) this.capturedConsumerProps.getValue()).get("client.id"));
        Assertions.assertEquals("test-client-id-configs-leader", fencableProducerProps.get("client.id"));
    }

    @Test
    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() {
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(2);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        Assertions.assertTrue(Assertions.assertThrows(ConfigException.class, () -> {
            this.configStorage.start();
        }).getMessage().contains("required to have a single partition"));
    }

    @Test
    public void testFencableProducerPropertiesInsertedByDefault() {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.put("group.id", "my-connect-cluster");
        this.props.remove("transactional.id");
        this.props.remove("enable.idempotence");
        createStore();
        Map fencableProducerProps = this.configStorage.fencableProducerProps(this.config);
        Assertions.assertEquals("connect-cluster-" + "my-connect-cluster", fencableProducerProps.get("transactional.id"));
        Assertions.assertEquals("true", fencableProducerProps.get("enable.idempotence"));
    }

    @Test
    public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
        this.props.put("exactly.once.source.support", "enabled");
        this.props.remove("isolation.level");
        createStore();
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        Assertions.assertEquals(IsolationLevel.READ_COMMITTED.toString(), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
        this.props.put("exactly.once.source.support", "enabled");
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.toString());
        createStore();
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        Assertions.assertEquals(IsolationLevel.READ_COMMITTED.toString(), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
        this.props.put("exactly.once.source.support", "preparing");
        this.props.remove("isolation.level");
        createStore();
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        Assertions.assertNull(((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testBackgroundConnectorDeletion() throws Exception {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(1), snapshot.taskConfig(new ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
        Assertions.assertEquals(2, snapshot.taskCount(CONNECTOR_IDS.get(0)));
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(linkedHashMap2)).when(this.configLog)).readToEnd();
        Map<String, Struct> hashMap = new HashMap<>();
        hashMap.put(CONNECTOR_CONFIG_KEYS.get(0), null);
        hashMap.put(TARGET_STATE_KEYS.get(0), null);
        expectRead(linkedHashMap2, hashMap);
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorConfigRemove(CONNECTOR_IDS.get(0));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertFalse(snapshot2.contains(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(0, snapshot2.taskCount(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(Collections.emptyMap(), this.configStorage.deferredTaskUpdates);
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASK_CONFIG_STRUCTS.get(1));
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(6L, snapshot.offset());
        Assertions.assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assertions.assertEquals(Collections.emptyList(), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assertions.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assertions.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        Assertions.assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), snapshot.inconsistentConnectors());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(new LinkedHashMap<>())).doAnswer(expectReadToEnd(new LinkedHashMap<>())).doAnswer(expectReadToEnd(linkedHashMap2)).when(this.configLog)).readToEnd();
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 1);
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Collections.singletonList(SAMPLE_CONFIGS.get(0)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(8L, snapshot2.offset());
        Assertions.assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot2.connectors()));
        Assertions.assertEquals(Collections.singletonList(TASK_IDS.get(0)), snapshot2.tasks(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assertions.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(0)));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutRestartRequestOnlyFailed() {
        testPutRestartRequest(new RestartRequest(CONNECTOR_IDS.get(0), true, false));
    }

    @Test
    public void testPutRestartRequestOnlyFailedIncludingTasks() {
        testPutRestartRequest(new RestartRequest(CONNECTOR_IDS.get(0), true, true));
    }

    private void testPutRestartRequest(RestartRequest restartRequest) {
        expectStart(Collections.emptyList(), Collections.emptyMap());
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(linkedHashMap)).when(this.configLog)).readToEnd();
        expectConvertWriteRead2(RESTART_CONNECTOR_KEYS.get(0), KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0), new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("only-failed", Boolean.valueOf(restartRequest.onlyFailed())).put("include-tasks", Boolean.valueOf(restartRequest.includeTasks())));
        this.configStorage.putRestartRequest(restartRequest);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestartRequest.class);
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onRestartRequest((RestartRequest) forClass.capture());
        Assertions.assertEquals(restartRequest.connectorName(), ((RestartRequest) forClass.getValue()).connectorName());
        Assertions.assertEquals(Boolean.valueOf(restartRequest.onlyFailed()), Boolean.valueOf(((RestartRequest) forClass.getValue()).onlyFailed()));
        Assertions.assertEquals(Boolean.valueOf(restartRequest.includeTasks()), Boolean.valueOf(((RestartRequest) forClass.getValue()).includeTasks()));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testRestoreRestartRequestInconsistentState() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), RESTART_REQUEST_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), RESTART_REQUEST_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), RESTART_REQUEST_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        this.logOffset = 4L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onConnectorConfigRemove(ArgumentMatchers.anyString());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onConnectorConfigUpdate(ArgumentMatchers.anyString());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onTaskConfigUpdate(ArgumentMatchers.anyCollection());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onConnectorTargetStateChange(ArgumentMatchers.anyString());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onSessionKeyUpdate((SessionKey) ArgumentMatchers.any(SessionKey.class));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onRestartRequest((RestartRequest) ArgumentMatchers.any(RestartRequest.class));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onLoggingLevelUpdate(ArgumentMatchers.anyString(), ArgumentMatchers.anyString());
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutTaskConfigsZeroTasks() {
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)))).when(this.configLog)).readToEnd();
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), "tasks", 0);
        addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        Assertions.assertEquals(-1L, this.configStorage.snapshot().offset());
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(1L, snapshot.offset());
        String str = CONNECTOR_IDS.get(0);
        Assertions.assertEquals(Collections.singletonList(str), new ArrayList(snapshot.connectors()));
        Assertions.assertEquals(Collections.emptyList(), snapshot.tasks(str));
        Assertions.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onTaskConfigUpdate(Collections.emptyList());
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testBackgroundUpdateTargetState() throws Exception {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), this.configStorage.connectorTargetStates.keySet());
        Assertions.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(linkedHashMap2)).when(this.configLog)).readToEnd();
        Map<String, Struct> hashMap = new HashMap<>();
        hashMap.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED);
        hashMap.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED);
        expectRead(linkedHashMap2, hashMap);
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(new HashSet(CONNECTOR_IDS), this.configStorage.connectorTargetStates.keySet());
        Assertions.assertEquals(TargetState.PAUSED, snapshot2.targetState(CONNECTOR_IDS.get(0)));
        Assertions.assertEquals(TargetState.STOPPED, snapshot2.targetState(CONNECTOR_IDS.get(1)));
        this.configStorage.stop();
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage)).stop();
    }

    @Test
    public void testSameTargetState() {
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        Assertions.assertEquals(TargetState.STARTED, this.configStorage.snapshot().targetState(CONNECTOR_IDS.get(0)));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener, Mockito.never())).onConnectorTargetStateChange(ArgumentMatchers.anyString());
        this.configStorage.stop();
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage)).stop();
    }

    @Test
    public void testPutLogLevel() throws Exception {
        Struct put = new Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0).put("level", "ERROR");
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-org.apache.zookeeper", CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, "logger-cluster-org.apache.cassandra", CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), put);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), null);
        this.logOffset = 2L;
        expectStart(asList, linkedHashMap);
        Mockito.when(Integer.valueOf(this.configLog.partitionCount())).thenReturn(1);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        expectConvertWriteRead("logger-cluster-org.apache.kafka.clients", KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2), "level", "WARN");
        this.configStorage.putLoggerLevel("org.apache.kafka.clients", "WARN");
        expectConvertWriteRead("logger-cluster-org.apache.kafka.connect", KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3), "level", "DEBUG");
        this.configStorage.putLoggerLevel("org.apache.kafka.connect", "DEBUG");
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put("logger-cluster-org.apache.kafka.clients", CONFIGS_SERIALIZED.get(2));
        linkedHashMap2.put("logger-cluster-org.apache.kafka.connect", CONFIGS_SERIALIZED.get(3));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(linkedHashMap2)).when(this.configLog)).readToEnd();
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onLoggingLevelUpdate("org.apache.kafka.clients", "WARN");
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onLoggingLevelUpdate("org.apache.kafka.connect", "DEBUG");
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testTaskCountRecordsAndGenerations() {
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(new LinkedHashMap<>())).doAnswer(expectReadToEnd(new LinkedHashMap<>())).doAnswer(expectReadToEnd(linkedHashMap)).doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.3
            {
                put(KafkaConfigBackingStoreTest.CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(3));
            }
        })).when(this.configLog)).readToEnd();
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)));
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)));
        expectConvertWriteRead2(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2));
        expectConvertWriteRead2(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3), new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 4));
        addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        String str = CONNECTOR_IDS.get(0);
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertFalse(snapshot.pendingFencing(str));
        Assertions.assertNull(snapshot.taskCountRecord(str));
        Assertions.assertNull(snapshot.taskConfigGeneration(str));
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(3L, snapshot2.offset());
        Assertions.assertTrue(snapshot2.pendingFencing(str));
        Assertions.assertNull(snapshot2.taskCountRecord(str));
        Assertions.assertEquals(0L, snapshot2.taskConfigGeneration(str).intValue());
        this.configStorage.putTaskCountRecord(str, 4);
        ClusterConfigState snapshot3 = this.configStorage.snapshot();
        Assertions.assertEquals(4L, snapshot3.offset());
        Assertions.assertFalse(snapshot3.pendingFencing(str));
        Assertions.assertEquals(4L, snapshot3.taskCountRecord(str).intValue());
        Assertions.assertEquals(0L, snapshot3.taskConfigGeneration(str).intValue());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutTaskConfigs() {
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.4
            {
                put(KafkaConfigBackingStoreTest.TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(0));
                put(KafkaConfigBackingStoreTest.TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(1));
                put(KafkaConfigBackingStoreTest.COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(2));
            }
        })).when(this.configLog)).readToEnd();
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)));
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)));
        expectConvertWriteRead2(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2));
        addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(-1L, snapshot.offset());
        Assertions.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assertions.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(3L, snapshot2.offset());
        String str = CONNECTOR_IDS.get(0);
        Assertions.assertEquals(Collections.singletonList(str), new ArrayList(snapshot2.connectors()));
        Assertions.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot2.tasks(str));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(1), snapshot2.taskConfig(TASK_IDS.get(1)));
        Assertions.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    @Test
    public void testPutTaskConfigsStartsOnlyReconfiguredTasks() {
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, this.config);
        verifyConfigure();
        this.configStorage.start();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).start();
        ((KafkaBasedLog) Mockito.doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.5
            {
                put(KafkaConfigBackingStoreTest.TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(0));
                put(KafkaConfigBackingStoreTest.TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(1));
                put(KafkaConfigBackingStoreTest.COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(2));
            }
        })).doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(new LinkedHashMap())).doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.6
            {
                put(KafkaConfigBackingStoreTest.TASK_CONFIG_KEYS.get(2), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(3));
                put(KafkaConfigBackingStoreTest.COMMIT_TASKS_CONFIG_KEYS.get(1), KafkaConfigBackingStoreTest.CONFIGS_SERIALIZED.get(4));
            }
        })).when(this.configLog)).readToEnd();
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)));
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)));
        expectConvertWriteRead2(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2));
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assertions.assertEquals(-1L, snapshot.offset());
        Assertions.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assertions.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        this.configStorage.putTaskConfigs(CONNECTOR_1_NAME, Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        expectConvertWriteRead2(TASK_CONFIG_KEYS.get(2), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)));
        expectConvertWriteRead2(COMMIT_TASKS_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(4), new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 1));
        addConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), Collections.emptyList());
        this.configStorage.putTaskConfigs(CONNECTOR_2_NAME, Collections.singletonList(SAMPLE_CONFIGS.get(2)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assertions.assertEquals(5L, snapshot2.offset());
        String str = CONNECTOR_IDS.get(0);
        String str2 = CONNECTOR_IDS.get(1);
        Assertions.assertEquals(Arrays.asList(str, str2), new ArrayList(snapshot2.connectors()));
        Assertions.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot2.tasks(str));
        Assertions.assertEquals(Collections.singletonList(TASK_IDS.get(2)), snapshot2.tasks(str2));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(1), snapshot2.taskConfig(TASK_IDS.get(1)));
        Assertions.assertEquals(SAMPLE_CONFIGS.get(2), snapshot2.taskConfig(TASK_IDS.get(2)));
        Assertions.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        ((ConfigBackingStore.UpdateListener) Mockito.verify(this.configUpdateListener)).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(2)));
        this.configStorage.stop();
        ((KafkaBasedLog) Mockito.verify(this.configLog)).stop();
    }

    private void verifyConfigure() {
        ((KafkaConfigBackingStore) Mockito.verify(this.configStorage)).createKafkaBasedLog((String) this.capturedTopic.capture(), (Map) this.capturedProducerProps.capture(), (Map) this.capturedConsumerProps.capture(), (Callback) this.capturedConsumedCallback.capture(), (NewTopic) this.capturedNewTopic.capture(), (Supplier) this.capturedAdminSupplier.capture(), (WorkerConfig) ArgumentMatchers.any(WorkerConfig.class), (Time) ArgumentMatchers.any(Time.class));
    }

    private void expectStart(List<ConsumerRecord<String, byte[]>> list, Map<byte[], Struct> map) {
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, (ConsumerRecord) it.next());
            }
            return null;
        }).when(this.configLog)).start();
        for (Map.Entry<byte[], Struct> entry : map.entrySet()) {
            Mockito.when(this.converter.toConnectData(TOPIC, entry.getKey())).thenReturn(new SchemaAndValue((Schema) null, structToMap(entry.getValue())));
        }
    }

    private void expectConvertWriteRead2(String str, Schema schema, byte[] bArr, Struct struct) {
        ((Converter) Mockito.doReturn(bArr).when(this.converter)).fromConnectData((String) ArgumentMatchers.eq(TOPIC), (Schema) ArgumentMatchers.eq(schema), ArgumentMatchers.eq(struct));
        ((KafkaBasedLog) Mockito.doReturn(this.producerFuture).when(this.configLog)).sendWithReceipt((String) ArgumentMatchers.eq(str), (byte[]) ArgumentMatchers.eq(bArr));
        ((Converter) Mockito.doReturn(new SchemaAndValue((Schema) null, structToMap(struct))).when(this.converter)).toConnectData((String) ArgumentMatchers.eq(TOPIC), (byte[]) ArgumentMatchers.eq(bArr));
    }

    private void expectConvertWriteRead(String str, Schema schema, byte[] bArr, String str2, Object obj) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Struct.class);
        Mockito.when(this.converter.fromConnectData((String) ArgumentMatchers.eq(TOPIC), (Schema) ArgumentMatchers.eq(schema), forClass.capture())).thenReturn(bArr);
        Mockito.when(this.configLog.sendWithReceipt(str, bArr)).thenReturn(this.producerFuture);
        Mockito.when(this.converter.toConnectData(TOPIC, bArr)).thenAnswer(invocationOnMock -> {
            Assertions.assertEquals(obj, ((Struct) forClass.getValue()).get(str2));
            return new SchemaAndValue((Schema) null, structToMap((Struct) forClass.getValue()));
        });
    }

    private void expectRead(LinkedHashMap<String, byte[]> linkedHashMap, Map<String, Struct> map) {
        for (Map.Entry<String, Struct> entry : map.entrySet()) {
            Mockito.when(this.converter.toConnectData(TOPIC, linkedHashMap.get(entry.getKey()))).thenReturn(new SchemaAndValue((Schema) null, structToMap(entry.getValue())));
        }
    }

    private Answer<Future<Void>> expectReadToEnd(Map<String, byte[]> map) {
        return invocationOnMock -> {
            for (Map.Entry entry : map.entrySet()) {
                Callback callback = (Callback) this.capturedConsumedCallback.getValue();
                long j = this.logOffset;
                this.logOffset = j + 1;
                callback.onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, j, 0L, TimestampType.CREATE_TIME, 0, 0, (String) entry.getKey(), (byte[]) entry.getValue(), new RecordHeaders(), Optional.empty()));
            }
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(null);
            return completableFuture;
        };
    }

    private Map<String, Object> structToMap(Struct struct) {
        if (struct == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        for (Field field : struct.schema().fields()) {
            hashMap.put(field.name(), struct.get(field));
        }
        return hashMap;
    }

    private void addConnector(String str, Map<String, String> map, List<Map<String, String>> list) {
        for (int i = 0; i < list.size(); i++) {
            this.configStorage.taskConfigs.put(new ConnectorTaskId(str, i), list.get(i));
        }
        this.configStorage.connectorConfigs.put(str, map);
        this.configStorage.connectorTaskCounts.put(str, Integer.valueOf(list.size()));
    }

    /* JADX WARN: Type inference failed for: r0v69, types: [byte[], java.lang.Object[]] */
    /* JADX WARN: Type inference failed for: r0v76, types: [byte[], java.lang.Object[]] */
    static {
        DEFAULT_CONFIG_STORAGE_PROPS.put("config.storage.topic", TOPIC);
        DEFAULT_CONFIG_STORAGE_PROPS.put("offset.storage.topic", "connect-offsets");
        DEFAULT_CONFIG_STORAGE_PROPS.put("config.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_CONFIG_STORAGE_PROPS.put("group.id", "connect");
        DEFAULT_CONFIG_STORAGE_PROPS.put("status.storage.topic", "status-topic");
        DEFAULT_CONFIG_STORAGE_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        DEFAULT_CONFIG_STORAGE_PROPS.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_CONFIG_STORAGE_PROPS.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        CONNECTOR_IDS = Arrays.asList(CONNECTOR_1_NAME, CONNECTOR_2_NAME);
        CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
        COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
        TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2");
        CONNECTOR_TASK_COUNT_RECORD_KEYS = Arrays.asList("tasks-fencing-connector1", "tasks-fencing-connector2");
        RESTART_CONNECTOR_KEYS = Arrays.asList(KafkaConfigBackingStore.RESTART_KEY(CONNECTOR_1_NAME), KafkaConfigBackingStore.RESTART_KEY(CONNECTOR_2_NAME));
        TASK_IDS = Arrays.asList(new ConnectorTaskId(CONNECTOR_1_NAME, 0), new ConnectorTaskId(CONNECTOR_1_NAME, 1), new ConnectorTaskId(CONNECTOR_2_NAME, 0));
        TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
        SAMPLE_CONFIGS = Arrays.asList(Collections.singletonMap("config-key-one", "config-value-one"), Collections.singletonMap("config-key-two", "config-value-two"), Collections.singletonMap("config-key-three", "config-value-three"));
        TASK_CONFIG_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)));
        ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("include-tasks", false);
        INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("only-failed", true);
        RESTART_REQUEST_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put("only-failed", true).put("include-tasks", false), ONLY_FAILED_MISSING_STRUCT, INCLUDE_TASKS_MISSING_STRUCT);
        CONNECTOR_CONFIG_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)));
        TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1).put("state", "PAUSED").put("state.v2", "PAUSED");
        TARGET_STATE_PAUSED_LEGACY = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
        TARGET_STATE_STOPPED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1).put("state", "PAUSED").put("state.v2", "STOPPED");
        CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6), new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9));
        CONFIGS_SERIALIZED = Arrays.asList(new byte[]{"config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()});
        TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
        TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
        TARGET_STATES_SERIALIZED = Arrays.asList(new byte[]{"started".getBytes(), "paused".getBytes(), "stopped".getBytes()});
    }
}
