package org.apache.kafka.connect.storage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.class */
public class KafkaConfigBackingStoreTest {
    private static final String TOPIC = "connect-configs";
    private static final short TOPIC_REPLICATION_FACTOR = 5;
    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap();
    private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
    private static final List<String> CONNECTOR_IDS;
    private static final List<String> CONNECTOR_CONFIG_KEYS;
    private static final List<String> COMMIT_TASKS_CONFIG_KEYS;
    private static final List<String> TARGET_STATE_KEYS;
    private static final List<ConnectorTaskId> TASK_IDS;
    private static final List<String> TASK_CONFIG_KEYS;
    private static final List<Map<String, String>> SAMPLE_CONFIGS;
    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS;
    private static final List<Struct> TASK_CONFIG_STRUCTS;
    private static final Struct TARGET_STATE_PAUSED;
    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR;
    private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR;
    private static final List<byte[]> CONFIGS_SERIALIZED;

    @Mock
    private Converter converter;

    @Mock
    private ConfigBackingStore.UpdateListener configUpdateListener;

    @Mock
    KafkaBasedLog<String, byte[]> storeLog;
    private KafkaConfigBackingStore configStorage;
    private Capture<String> capturedTopic = EasyMock.newCapture();
    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
    private long logOffset = 0;

    public KafkaConfigBackingStoreTest() {
    }

    @Before
    public void setUp() {
        PowerMock.mockStaticPartial(ConnectUtils.class, new String[]{"lookupKafkaClusterId"});
        EasyMock.expect(ConnectUtils.lookupKafkaClusterId((WorkerConfig) EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
        PowerMock.replay(new Object[]{ConnectUtils.class});
        this.configStorage = (KafkaConfigBackingStore) PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, new Object[]{this.converter, DEFAULT_DISTRIBUTED_CONFIG, null});
        Whitebox.setInternalState(this.configStorage, "configLog", this.storeLog);
        this.configStorage.setUpdateListener(this.configUpdateListener);
    }

    @Test
    public void testStartStop() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        HashMap hashMap = new HashMap(DEFAULT_CONFIG_STORAGE_PROPS);
        hashMap.put("config.storage.min.insync.replicas", "3");
        hashMap.put("config.storage.max.message.bytes", "1001");
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, new DistributedConfig(hashMap));
        Assert.assertEquals(TOPIC, this.capturedTopic.getValue());
        Assert.assertEquals("org.apache.kafka.common.serialization.StringSerializer", ((Map) this.capturedProducerProps.getValue()).get("key.serializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("value.serializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("key.deserializer"));
        Assert.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("value.deserializer"));
        Assert.assertEquals(TOPIC, ((NewTopic) this.capturedNewTopic.getValue()).name());
        Assert.assertEquals(1L, ((NewTopic) this.capturedNewTopic.getValue()).numPartitions());
        Assert.assertEquals(5L, ((NewTopic) this.capturedNewTopic.getValue()).replicationFactor());
        Assert.assertEquals("3", ((NewTopic) this.capturedNewTopic.getValue()).configs().get("min.insync.replicas"));
        Assert.assertEquals("1001", ((NewTopic) this.capturedNewTopic.getValue()).configs().get("max.message.bytes"));
        this.configStorage.start();
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectConvertWriteAndRead(CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        this.configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
        EasyMock.expectLastCall();
        expectConvertWriteAndRead(CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        this.configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
        EasyMock.expectLastCall();
        expectConnectorRemoval(CONNECTOR_CONFIG_KEYS.get(1), TARGET_STATE_KEYS.get(1));
        this.configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(-1L, snapshot.offset());
        Assert.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot.connectorConfig(CONNECTOR_IDS.get(1)));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(1L, snapshot2.offset());
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot2.connectorConfig(CONNECTOR_IDS.get(1)));
        this.configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
        ClusterConfigState snapshot3 = this.configStorage.snapshot();
        Assert.assertEquals(2L, snapshot3.offset());
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot3.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot3.connectorConfig(CONNECTOR_IDS.get(1)));
        this.configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
        ClusterConfigState snapshot4 = this.configStorage.snapshot();
        Assert.assertEquals(4L, snapshot4.offset());
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot4.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot4.connectorConfig(CONNECTOR_IDS.get(1)));
        Assert.assertNull(snapshot4.targetState(CONNECTOR_IDS.get(1)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigs() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 2);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(-1L, snapshot.offset());
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        this.configStorage.putTaskConfigs("connector1", Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(3L, snapshot2.offset());
        String str = CONNECTOR_IDS.get(0);
        Assert.assertEquals(Arrays.asList(str), new ArrayList(snapshot2.connectors()));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot2.tasks(str));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot2.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), "properties", SAMPLE_CONFIGS.get(1));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 2);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap);
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(2), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3), "properties", SAMPLE_CONFIGS.get(2));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(4), "tasks", 1);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(2)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(TASK_CONFIG_KEYS.get(2), CONFIGS_SERIALIZED.get(3));
        linkedHashMap2.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
        expectReadToEnd(linkedHashMap2);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        whiteboxAddConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(-1L, snapshot.offset());
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        this.configStorage.putTaskConfigs("connector1", Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)));
        this.configStorage.putTaskConfigs("connector2", Collections.singletonList(SAMPLE_CONFIGS.get(2)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(5L, snapshot2.offset());
        String str = CONNECTOR_IDS.get(0);
        String str2 = CONNECTOR_IDS.get(1);
        Assert.assertEquals(Arrays.asList(str, str2), new ArrayList(snapshot2.connectors()));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot2.tasks(str));
        Assert.assertEquals(Collections.singletonList(TASK_IDS.get(2)), snapshot2.tasks(str2));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(1), snapshot2.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(2), snapshot2.taskConfig(TASK_IDS.get(2)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsZeroTasks() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), "tasks", 0);
        this.configUpdateListener.onTaskConfigUpdate(Collections.emptyList());
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        expectReadToEnd(linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
        Assert.assertEquals(-1L, this.configStorage.snapshot().offset());
        this.configStorage.putTaskConfigs("connector1", Collections.emptyList());
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(1L, snapshot.offset());
        String str = CONNECTOR_IDS.get(0);
        Assert.assertEquals(Arrays.asList(str), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(Collections.emptyList(), snapshot.tasks(str));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreTargetState() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(5L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(TargetState.PAUSED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testBackgroundUpdateTargetState() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_PAUSED);
        this.configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        Assert.assertEquals(TargetState.STARTED, this.configStorage.snapshot().targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testBackgroundConnectorDeletion() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1));
        Map<String, Struct> hashMap = new HashMap<>();
        hashMap.put(CONNECTOR_CONFIG_KEYS.get(0), null);
        hashMap.put(TARGET_STATE_KEYS.get(0), null);
        expectRead(linkedHashMap2, hashMap);
        this.configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
        EasyMock.expectLastCall();
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.subList(0, 2), snapshot.allTaskConfigs(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(2L, snapshot.taskCount(CONNECTOR_IDS.get(0)));
        this.configStorage.refresh(0L, TimeUnit.SECONDS);
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertFalse(snapshot2.contains(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Collections.emptyList(), snapshot2.allTaskConfigs(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(0L, snapshot2.taskCount(CONNECTOR_IDS.get(0)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 5L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(5L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestore() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR)), new ConsumerRecord(TOPIC, 0, 6L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), CONNECTOR_CONFIG_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
        this.logOffset = 7L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(7L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(TargetState.STARTED, snapshot.targetState(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(2), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreConnectorDeletion() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), null);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(6L, snapshot.offset());
        Assert.assertTrue(snapshot.connectors().isEmpty());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testRestoreZeroTasks() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR)), new ConsumerRecord(TOPIC, 0, 6L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)), new ConsumerRecord(TOPIC, 0, 7L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), CONNECTOR_CONFIG_STRUCTS.get(2));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
        this.logOffset = 8L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(8L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(SAMPLE_CONFIGS.get(2), snapshot.connectorConfig(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Collections.emptyList(), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
        expectConfigure();
        List<ConsumerRecord<String, byte[]>> asList = Arrays.asList(new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), new ConsumerRecord(TOPIC, 0, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
        linkedHashMap.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
        linkedHashMap.put(CONFIGS_SERIALIZED.get(TOPIC_REPLICATION_FACTOR), TASK_CONFIG_STRUCTS.get(1));
        this.logOffset = 6L;
        expectStart(asList, linkedHashMap);
        expectPartitionCount(1);
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), "properties", SAMPLE_CONFIGS.get(0));
        expectReadToEnd(new LinkedHashMap<>());
        expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), "tasks", 1);
        this.configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0)));
        EasyMock.expectLastCall();
        LinkedHashMap<String, byte[]> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap2.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
        linkedHashMap2.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
        expectReadToEnd(linkedHashMap2);
        expectStop();
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        this.configStorage.start();
        ClusterConfigState snapshot = this.configStorage.snapshot();
        Assert.assertEquals(6L, snapshot.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot.connectors()));
        Assert.assertEquals(Collections.emptyList(), snapshot.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(0)));
        Assert.assertNull(snapshot.taskConfig(TASK_IDS.get(1)));
        Assert.assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), snapshot.inconsistentConnectors());
        this.configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0)));
        ClusterConfigState snapshot2 = this.configStorage.snapshot();
        Assert.assertEquals(8L, snapshot2.offset());
        Assert.assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList(snapshot2.connectors()));
        Assert.assertEquals(Arrays.asList(TASK_IDS.get(0)), snapshot2.tasks(CONNECTOR_IDS.get(0)));
        Assert.assertEquals(SAMPLE_CONFIGS.get(0), snapshot2.taskConfig(TASK_IDS.get(0)));
        Assert.assertEquals(Collections.EMPTY_SET, snapshot2.inconsistentConnectors());
        this.configStorage.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception {
        expectConfigure();
        expectStart(Collections.emptyList(), Collections.emptyMap());
        expectPartitionCount(2);
        PowerMock.replayAll(new Object[0]);
        this.configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
        Assert.assertTrue(Assert.assertThrows(ConfigException.class, () -> {
            this.configStorage.start();
        }).getMessage().contains("required to have a single partition"));
        PowerMock.verifyAll();
    }

    private void expectConfigure() throws Exception {
        PowerMock.expectPrivate(this.configStorage, "createKafkaBasedLog", new Object[]{EasyMock.capture(this.capturedTopic), EasyMock.capture(this.capturedProducerProps), EasyMock.capture(this.capturedConsumerProps), EasyMock.capture(this.capturedConsumedCallback), EasyMock.capture(this.capturedNewTopic), EasyMock.capture(this.capturedAdminSupplier)}).andReturn(this.storeLog);
    }

    private void expectPartitionCount(int i) {
        EasyMock.expect(Integer.valueOf(this.storeLog.partitionCount())).andReturn(Integer.valueOf(i));
    }

    private void expectStart(final List<ConsumerRecord<String, byte[]>> list, Map<byte[], Struct> map) {
        this.storeLog.start();
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.1
            public Object answer() {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ((Callback) KafkaConfigBackingStoreTest.this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, (ConsumerRecord) it.next());
                }
                return null;
            }
        });
        for (Map.Entry<byte[], Struct> entry : map.entrySet()) {
            EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(entry.getKey()))).andReturn(new SchemaAndValue((Schema) null, structToMap(entry.getValue())));
        }
    }

    private void expectStop() {
        this.storeLog.stop();
        PowerMock.expectLastCall();
    }

    private void expectRead(LinkedHashMap<String, byte[]> linkedHashMap, Map<String, Struct> map) {
        expectReadToEnd(linkedHashMap);
        for (Map.Entry<String, Struct> entry : map.entrySet()) {
            EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(linkedHashMap.get(entry.getKey())))).andReturn(new SchemaAndValue((Schema) null, structToMap(entry.getValue())));
        }
    }

    private void expectRead(String str, byte[] bArr, Struct struct) {
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, bArr);
        expectRead(linkedHashMap, Collections.singletonMap(str, struct));
    }

    private void expectConvertWriteRead(String str, Schema schema, final byte[] bArr, final String str2, final Object obj) {
        final Capture newCapture = EasyMock.newCapture();
        if (bArr != null) {
            EasyMock.expect(this.converter.fromConnectData((String) EasyMock.eq(TOPIC), (Schema) EasyMock.eq(schema), EasyMock.capture(newCapture))).andReturn(bArr);
        }
        this.storeLog.send(EasyMock.eq(str), EasyMock.aryEq(bArr));
        PowerMock.expectLastCall();
        EasyMock.expect(this.converter.toConnectData((String) EasyMock.eq(TOPIC), EasyMock.aryEq(bArr))).andAnswer(new IAnswer<SchemaAndValue>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public SchemaAndValue m125answer() throws Throwable {
                if (str2 != null) {
                    Assert.assertEquals(obj, ((Struct) newCapture.getValue()).get(str2));
                }
                return new SchemaAndValue((Schema) null, bArr == null ? null : KafkaConfigBackingStoreTest.this.structToMap((Struct) newCapture.getValue()));
            }
        });
    }

    private void expectReadToEnd(final LinkedHashMap<String, byte[]> linkedHashMap) {
        EasyMock.expect(this.storeLog.readToEnd()).andAnswer(new IAnswer<Future<Void>>() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.3
            /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
                jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.access$208(org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest):long
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
                	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
                Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest
                	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
                	... 1 more
                */
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public java.util.concurrent.Future<java.lang.Void> m126answer() {
                /*
                    r18 = this;
                    org.apache.kafka.connect.util.TestFuture r0 = new org.apache.kafka.connect.util.TestFuture
                    r1 = r0
                    r1.<init>()
                    r19 = r0
                    r0 = r18
                    java.util.LinkedHashMap r0 = r5
                    java.util.Set r0 = r0.entrySet()
                    java.util.Iterator r0 = r0.iterator()
                    r20 = r0
                L15:
                    r0 = r20
                    boolean r0 = r0.hasNext()
                    if (r0 == 0) goto L62
                    r0 = r20
                    java.lang.Object r0 = r0.next()
                    java.util.Map$Entry r0 = (java.util.Map.Entry) r0
                    r21 = r0
                    r0 = r18
                    org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest r0 = org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.this
                    org.easymock.Capture r0 = org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.access$000(r0)
                    java.lang.Object r0 = r0.getValue()
                    org.apache.kafka.connect.util.Callback r0 = (org.apache.kafka.connect.util.Callback) r0
                    r1 = 0
                    org.apache.kafka.clients.consumer.ConsumerRecord r2 = new org.apache.kafka.clients.consumer.ConsumerRecord
                    r3 = r2
                    java.lang.String r4 = "connect-configs"
                    r5 = 0
                    r6 = r18
                    org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest r6 = org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.this
                    long r6 = org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.access$208(r6)
                    r7 = 0
                    org.apache.kafka.common.record.TimestampType r8 = org.apache.kafka.common.record.TimestampType.CREATE_TIME
                    r9 = 0
                    r10 = 0
                    r11 = 0
                    r12 = r21
                    java.lang.Object r12 = r12.getKey()
                    r13 = r21
                    java.lang.Object r13 = r13.getValue()
                    r3.<init>(r4, r5, r6, r7, r8, r9, r10, r11, r12, r13)
                    r0.onCompletion(r1, r2)
                    goto L15
                L62:
                    r0 = r19
                    r1 = 0
                    java.lang.Void r1 = (java.lang.Void) r1
                    r0.resolveOnGet(r1)
                    r0 = r19
                    return r0
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.AnonymousClass3.m126answer():java.util.concurrent.Future");
            }
        });
    }

    private void expectConnectorRemoval(String str, String str2) {
        expectConvertWriteRead(str, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
        expectConvertWriteRead(str2, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, null);
        linkedHashMap.put(str2, null);
        expectReadToEnd(linkedHashMap);
    }

    private void expectConvertWriteAndRead(String str, Schema schema, byte[] bArr, String str2, Object obj) {
        expectConvertWriteRead(str, schema, bArr, str2, obj);
        LinkedHashMap<String, byte[]> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(str, bArr);
        expectReadToEnd(linkedHashMap);
    }

    private void whiteboxAddConnector(String str, Map<String, String> map, List<Map<String, String>> list) {
        Map map2 = (Map) Whitebox.getInternalState(this.configStorage, "taskConfigs");
        for (int i = 0; i < list.size(); i++) {
            map2.put(new ConnectorTaskId(str, i), list.get(i));
        }
        ((Map) Whitebox.getInternalState(this.configStorage, "connectorConfigs")).put(str, map);
        ((Map) Whitebox.getInternalState(this.configStorage, "connectorTaskCounts")).put(str, Integer.valueOf(list.size()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Object> structToMap(Struct struct) {
        if (struct == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        for (Field field : struct.schema().fields()) {
            hashMap.put(field.name(), struct.get(field));
        }
        return hashMap;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.access$208(org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$208(org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest r8) {
        /*
            r0 = r8
            r1 = r0
            long r1 = r1.logOffset
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.logOffset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest.access$208(org.apache.kafka.connect.storage.KafkaConfigBackingStoreTest):long");
    }

    /* JADX WARN: Type inference failed for: r0v56, types: [byte[], java.lang.Object[]] */
    static {
        DEFAULT_CONFIG_STORAGE_PROPS.put("config.storage.topic", TOPIC);
        DEFAULT_CONFIG_STORAGE_PROPS.put("offset.storage.topic", "connect-offsets");
        DEFAULT_CONFIG_STORAGE_PROPS.put("config.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_CONFIG_STORAGE_PROPS.put("group.id", "connect");
        DEFAULT_CONFIG_STORAGE_PROPS.put("status.storage.topic", "status-topic");
        DEFAULT_CONFIG_STORAGE_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        DEFAULT_CONFIG_STORAGE_PROPS.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_CONFIG_STORAGE_PROPS.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_CONFIG_STORAGE_PROPS.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_CONFIG_STORAGE_PROPS.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS);
        CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
        CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
        COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
        TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2");
        TASK_IDS = Arrays.asList(new ConnectorTaskId("connector1", 0), new ConnectorTaskId("connector1", 1), new ConnectorTaskId("connector2", 0));
        TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
        SAMPLE_CONFIGS = Arrays.asList(Collections.singletonMap("config-key-one", "config-value-one"), Collections.singletonMap("config-key-two", "config-value-two"), Collections.singletonMap("config-key-three", "config-value-three"));
        CONNECTOR_CONFIG_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)));
        TASK_CONFIG_STRUCTS = Arrays.asList(new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)));
        TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
        TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
        TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
        CONFIGS_SERIALIZED = Arrays.asList(new byte[]{"config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()});
    }
}
