package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConnectorTest;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.class */
public class KafkaOffsetBackingStoreTest {
    private static final String CLIENT_ID_BASE = "test-client-id-";
    private static final String TOPIC = "connect-offsets";
    private static final short TOPIC_PARTITIONS = 2;
    private static final short TOPIC_REPLICATION_FACTOR = 5;
    private static final Map<String, String> DEFAULT_PROPS = new HashMap();
    private static final Map<ByteBuffer, ByteBuffer> FIRST_SET;
    private static final ByteBuffer TP0_KEY;
    private static final ByteBuffer TP1_KEY;
    private static final ByteBuffer TP2_KEY;
    private static final ByteBuffer TP0_VALUE;
    private static final ByteBuffer TP1_VALUE;
    private static final ByteBuffer TP2_VALUE;
    private static final ByteBuffer TP0_VALUE_NEW;
    private static final ByteBuffer TP1_VALUE_NEW;
    private final Map<String, String> props = new HashMap(DEFAULT_PROPS);

    @Mock
    KafkaBasedLog<byte[], byte[]> storeLog;

    @Mock
    Converter keyConverter;
    private KafkaOffsetBackingStore store;

    @Captor
    private ArgumentCaptor<String> capturedTopic;

    @Captor
    private ArgumentCaptor<Map<String, Object>> capturedProducerProps;

    @Captor
    private ArgumentCaptor<Map<String, Object>> capturedConsumerProps;

    @Captor
    private ArgumentCaptor<Supplier<TopicAdmin>> capturedAdminSupplier;

    @Captor
    private ArgumentCaptor<NewTopic> capturedNewTopic;

    @Captor
    private ArgumentCaptor<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback;

    @Captor
    private ArgumentCaptor<Callback<Void>> storeLogCallbackArgumentCaptor;

    public void setup(Boolean bool) {
        Supplier supplier = () -> {
            Assertions.fail("Should not attempt to instantiate admin in these tests");
            throw new AssertionError();
        };
        Supplier supplier2 = () -> {
            return CLIENT_ID_BASE;
        };
        if (bool.booleanValue()) {
            Mockito.when(this.keyConverter.toConnectData((String) ArgumentMatchers.any(), (byte[]) ArgumentMatchers.any())).thenReturn(new SchemaAndValue((Schema) null, Arrays.asList(WorkerConnectorTest.CONNECTOR, Collections.singletonMap("partitionKey", "dummy"))));
        }
        this.store = (KafkaOffsetBackingStore) Mockito.spy(new KafkaOffsetBackingStore(supplier, supplier2, this.keyConverter));
        ((KafkaOffsetBackingStore) Mockito.doReturn(this.storeLog).when(this.store)).createKafkaBasedLog((String) this.capturedTopic.capture(), (Map) this.capturedProducerProps.capture(), (Map) this.capturedConsumerProps.capture(), (Callback) this.capturedConsumedCallback.capture(), (NewTopic) this.capturedNewTopic.capture(), (Supplier) this.capturedAdminSupplier.capture(), (WorkerConfig) ArgumentMatchers.any(), (Time) ArgumentMatchers.any());
    }

    @AfterEach
    public void tearDown() {
        Mockito.verifyNoMoreInteractions(new Object[]{this.storeLog});
    }

    private DistributedConfig mockConfig(Map<String, String> map) {
        DistributedConfig distributedConfig = (DistributedConfig) Mockito.spy(new DistributedConfig(map));
        ((DistributedConfig) Mockito.doReturn("test-cluster").when(distributedConfig)).kafkaClusterId();
        return distributedConfig;
    }

    @Test
    public void testStartStop() {
        setup(false);
        this.props.put("offset.storage.min.insync.replicas", "3");
        this.props.put("offset.storage.max.message.bytes", "1001");
        this.store.configure(mockConfig(this.props));
        this.store.start();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).start();
        Assertions.assertEquals(TOPIC, this.capturedTopic.getValue());
        Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("key.serializer"));
        Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", ((Map) this.capturedProducerProps.getValue()).get("value.serializer"));
        Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("key.deserializer"));
        Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", ((Map) this.capturedConsumerProps.getValue()).get("value.deserializer"));
        Assertions.assertEquals(TOPIC, ((NewTopic) this.capturedNewTopic.getValue()).name());
        Assertions.assertEquals(TOPIC_PARTITIONS, ((NewTopic) this.capturedNewTopic.getValue()).numPartitions());
        Assertions.assertEquals((short) 5, ((NewTopic) this.capturedNewTopic.getValue()).replicationFactor());
        this.store.stop();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).stop();
    }

    @Test
    public void testReloadOnStart() {
        setup(true);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), new RecordHeaders(), Optional.empty()));
            return null;
        }).when(this.storeLog)).start();
        this.store.configure(mockConfig(this.props));
        this.store.start();
        HashMap hashMap = this.store.data;
        Assertions.assertEquals(TP0_VALUE_NEW, hashMap.get(TP0_KEY));
        Assertions.assertEquals(TP1_VALUE_NEW, hashMap.get(TP1_KEY));
        this.store.stop();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).stop();
    }

    @Test
    public void testGetSet() throws Exception {
        setup(true);
        this.store.configure(mockConfig(this.props));
        this.store.start();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).start();
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        Map map = (Map) this.store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertNull(map.get(TP0_KEY));
        Assertions.assertNull(map.get(TP1_KEY));
        HashMap hashMap = new HashMap();
        hashMap.put(TP0_KEY, TP0_VALUE);
        hashMap.put(TP1_KEY, TP1_VALUE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Future future = this.store.set(hashMap, (th, r5) -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(future.isDone());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP0_KEY.array()), AdditionalMatchers.aryEq(TP0_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass.capture());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP1_KEY.array()), AdditionalMatchers.aryEq(TP1_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass2.capture());
        ((org.apache.kafka.clients.producer.Callback) forClass2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assertions.assertFalse(atomicBoolean.get());
        ((org.apache.kafka.clients.producer.Callback) forClass.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        future.get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertTrue(atomicBoolean.get());
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        Map map2 = (Map) this.store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(TP0_VALUE, map2.get(TP0_KEY));
        Assertions.assertEquals(TP1_VALUE, map2.get(TP1_KEY));
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock3 -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        Map map3 = (Map) this.store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(TP0_VALUE_NEW, map3.get(TP0_KEY));
        Assertions.assertEquals(TP1_VALUE_NEW, map3.get(TP1_KEY));
        this.store.stop();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).stop();
    }

    @Test
    public void testGetSetNull() throws Exception {
        setup(true);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object) null, TP0_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 1, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        this.store.configure(mockConfig(this.props));
        this.store.start();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).start();
        HashMap hashMap = new HashMap();
        hashMap.put(null, TP0_VALUE);
        hashMap.put(TP1_KEY, TP1_VALUE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Future future = this.store.set(hashMap, (th, r5) -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(future.isDone());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(ArgumentMatchers.isNull(), AdditionalMatchers.aryEq(TP0_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass.capture());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP1_KEY.array()), AdditionalMatchers.aryEq(TP1_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass2.capture());
        ((org.apache.kafka.clients.producer.Callback) forClass2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assertions.assertFalse(atomicBoolean.get());
        ((org.apache.kafka.clients.producer.Callback) forClass.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        future.get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertTrue(atomicBoolean.get());
        Map map = (Map) this.store.get(Arrays.asList(null, TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(TP0_VALUE, map.get(null));
        Assertions.assertEquals(TP1_VALUE, map.get(TP1_KEY));
        hashMap.clear();
        hashMap.put(TP1_KEY, null);
        atomicBoolean.set(false);
        Assertions.assertFalse(this.store.set(hashMap, (th2, r52) -> {
            atomicBoolean.set(true);
        }).isDone());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP1_KEY.array()), ArgumentMatchers.isNull(), (org.apache.kafka.clients.producer.Callback) forClass3.capture());
        ((org.apache.kafka.clients.producer.Callback) forClass3.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assertions.assertTrue(atomicBoolean.get());
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), (Object) null, new RecordHeaders(), Optional.empty()));
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        Assertions.assertNull(((Map) this.store.get(Collections.singletonList(TP1_KEY)).get(10000L, TimeUnit.MILLISECONDS)).get(TP1_KEY));
        Assertions.assertFalse(this.store.data.containsKey(TP1_KEY));
        this.store.stop();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).stop();
    }

    @Test
    public void testSetFailure() {
        setup(false);
        this.store.configure(mockConfig(this.props));
        this.store.start();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).start();
        HashMap hashMap = new HashMap();
        hashMap.put(TP0_KEY, TP0_VALUE);
        hashMap.put(TP1_KEY, TP1_VALUE);
        hashMap.put(TP2_KEY, TP2_VALUE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Future future = this.store.set(hashMap, (th, r6) -> {
            atomicBoolean.set(true);
            if (th != null) {
                atomicBoolean2.set(true);
            }
        });
        Assertions.assertFalse(future.isDone());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP0_KEY.array()), AdditionalMatchers.aryEq(TP0_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass.capture());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP1_KEY.array()), AdditionalMatchers.aryEq(TP1_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass2.capture());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).send(AdditionalMatchers.aryEq(TP2_KEY.array()), AdditionalMatchers.aryEq(TP2_VALUE.array()), (org.apache.kafka.clients.producer.Callback) forClass3.capture());
        ((org.apache.kafka.clients.producer.Callback) forClass2.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        Assertions.assertFalse(atomicBoolean.get());
        ((org.apache.kafka.clients.producer.Callback) forClass3.getValue()).onCompletion((RecordMetadata) null, new KafkaException("bogus error"));
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertTrue(atomicBoolean2.get());
        ((org.apache.kafka.clients.producer.Callback) forClass.getValue()).onCompletion((RecordMetadata) null, (Exception) null);
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        });
        Assertions.assertNotNull(executionException.getCause());
        Assertions.assertInstanceOf(KafkaException.class, executionException.getCause());
        this.store.stop();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).stop();
    }

    @Test
    public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
        setup(false);
        this.props.put("exactly.once.source.support", "enabled");
        this.props.remove("isolation.level");
        this.store.configure(mockConfig(this.props));
        Assertions.assertEquals(IsolationLevel.READ_COMMITTED.toString(), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
        setup(false);
        this.props.put("exactly.once.source.support", "enabled");
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.toString());
        this.store.configure(mockConfig(this.props));
        Assertions.assertEquals(IsolationLevel.READ_COMMITTED.toString(), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
        setup(false);
        this.props.put("exactly.once.source.support", "disabled");
        this.props.remove("isolation.level");
        this.store.configure(mockConfig(this.props));
        Assertions.assertNull(((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() {
        setup(false);
        this.props.put("exactly.once.source.support", "disabled");
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.toString());
        this.store.configure(mockConfig(this.props));
        Assertions.assertEquals(IsolationLevel.READ_UNCOMMITTED.toString(), ((Map) this.capturedConsumerProps.getValue()).get("isolation.level"));
    }

    @Test
    public void testClientIds() {
        setup(false);
        this.store.configure(mockConfig(this.props));
        Assertions.assertEquals("test-client-id-offsets", ((Map) this.capturedProducerProps.getValue()).get("client.id"));
        Assertions.assertEquals("test-client-id-offsets", ((Map) this.capturedConsumerProps.getValue()).get("client.id"));
    }

    @Test
    public void testConnectorPartitions() throws Exception {
        JsonConverter jsonConverter = new JsonConverter();
        jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), true);
        this.store = (KafkaOffsetBackingStore) Mockito.spy(new KafkaOffsetBackingStore(() -> {
            Assertions.fail("Should not attempt to instantiate admin in these tests");
            return null;
        }, () -> {
            return CLIENT_ID_BASE;
        }, jsonConverter));
        ((KafkaOffsetBackingStore) Mockito.doReturn(this.storeLog).when(this.store)).createKafkaBasedLog((String) this.capturedTopic.capture(), (Map) this.capturedProducerProps.capture(), (Map) this.capturedConsumerProps.capture(), (Callback) this.capturedConsumedCallback.capture(), (NewTopic) this.capturedNewTopic.capture(), (Supplier) this.capturedAdminSupplier.capture(), (WorkerConfig) ArgumentMatchers.any(), (Time) ArgumentMatchers.any());
        this.store.configure(mockConfig(this.props));
        this.store.start();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).start();
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 0L, 0L, TimestampType.CREATE_TIME, 0, 0, jsonConverter.fromConnectData("", (Schema) null, Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1"))), TP0_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, jsonConverter.fromConnectData("", (Schema) null, Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1"))), TP1_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, jsonConverter.fromConnectData("", (Schema) null, Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue2"))), TP2_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, jsonConverter.fromConnectData("", (Schema) null, Arrays.asList("connector2", Collections.singletonMap("partitionKey", "partitionValue"))), TP1_VALUE.array(), new RecordHeaders(), Optional.empty()));
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        this.store.get(Collections.emptyList()).get(10000L, TimeUnit.MILLISECONDS);
        Set connectorPartitions = this.store.connectorPartitions("connector1");
        HashSet hashSet = new HashSet();
        hashSet.add(Collections.singletonMap("partitionKey", "partitionValue1"));
        hashSet.add(Collections.singletonMap("partitionKey", "partitionValue2"));
        Assertions.assertEquals(hashSet, connectorPartitions);
        Assertions.assertEquals(Collections.singleton(Collections.singletonMap("partitionKey", "partitionValue")), this.store.connectorPartitions("connector2"));
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) this.capturedConsumedCallback.getValue()).onCompletion((Throwable) null, new ConsumerRecord(TOPIC, 0, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, jsonConverter.fromConnectData("", (Schema) null, Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1"))), (Object) null, new RecordHeaders(), Optional.empty()));
            ((Callback) this.storeLogCallbackArgumentCaptor.getValue()).onCompletion((Throwable) null, (Object) null);
            return null;
        }).when(this.storeLog)).readToEnd((Callback) this.storeLogCallbackArgumentCaptor.capture());
        this.store.get(Collections.emptyList()).get(10000L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(Collections.singleton(Collections.singletonMap("partitionKey", "partitionValue2")), this.store.connectorPartitions("connector1"));
        this.store.stop();
        ((KafkaBasedLog) Mockito.verify(this.storeLog)).stop();
    }

    private static ByteBuffer buffer(String str) {
        return ByteBuffer.wrap(str.getBytes());
    }

    static {
        DEFAULT_PROPS.put("bootstrap.servers", "broker1:9092,broker2:9093");
        DEFAULT_PROPS.put("offset.storage.topic", TOPIC);
        DEFAULT_PROPS.put("offset.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_PROPS.put("offset.storage.partitions", Integer.toString(TOPIC_PARTITIONS));
        DEFAULT_PROPS.put("config.storage.topic", "connect-configs");
        DEFAULT_PROPS.put("config.storage.replication.factor", Short.toString((short) 5));
        DEFAULT_PROPS.put("group.id", "connect");
        DEFAULT_PROPS.put("status.storage.topic", "status-topic");
        DEFAULT_PROPS.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        DEFAULT_PROPS.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        FIRST_SET = new HashMap();
        FIRST_SET.put(buffer("key"), buffer("value"));
        FIRST_SET.put(null, null);
        TP0_KEY = buffer("TP0KEY");
        TP1_KEY = buffer("TP1KEY");
        TP2_KEY = buffer("TP2KEY");
        TP0_VALUE = buffer("VAL0");
        TP1_VALUE = buffer("VAL1");
        TP2_VALUE = buffer("VAL2");
        TP0_VALUE_NEW = buffer("VAL0_NEW");
        TP1_VALUE_NEW = buffer("VAL1_NEW");
    }
}
