package org.apache.kafka.connect.storage;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.class */
public class KafkaStatusBackingStoreTest {
    private static final String STATUS_TOPIC = "status-topic";
    private static final String WORKER_ID = "localhost:8083";
    private static final String CONNECTOR = "conn";
    private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
    private KafkaStatusBackingStore store;
    private final KafkaBasedLog<String, byte[]> kafkaBasedLog = (KafkaBasedLog) Mockito.mock(KafkaBasedLog.class);
    Converter converter = (Converter) Mockito.mock(Converter.class);
    WorkerConfig workerConfig = (WorkerConfig) Mockito.mock(WorkerConfig.class);

    @Before
    public void setup() {
        this.store = new KafkaStatusBackingStore(new MockTime(), this.converter, STATUS_TOPIC, () -> {
            return null;
        }, this.kafkaBasedLog);
    }

    @Test
    public void misconfigurationOfStatusBackingStore() {
        Mockito.when(this.workerConfig.getString("status.storage.topic")).thenReturn((Object) null);
        Mockito.when(this.workerConfig.getString("status.storage.topic")).thenReturn("   ");
        Assert.assertEquals("Must specify topic for connector status.", ((Exception) Assert.assertThrows(ConfigException.class, () -> {
            this.store.configure(this.workerConfig);
        })).getMessage());
        Assert.assertEquals("Must specify topic for connector status.", ((Exception) Assert.assertThrows(ConfigException.class, () -> {
            this.store.configure(this.workerConfig);
        })).getMessage());
    }

    @Test
    public void putConnectorState() {
        byte[] bArr = new byte[0];
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-connector-conn"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertNull(this.store.get(CONNECTOR));
    }

    @Test
    public void putConnectorStateRetriableFailure() {
        byte[] bArr = new byte[0];
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, new TimeoutException());
            return null;
        }).doAnswer(invocationOnMock2 -> {
            ((Callback) invocationOnMock2.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-connector-conn"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertNull(this.store.get(CONNECTOR));
    }

    @Test
    public void putConnectorStateNonRetriableFailure() {
        byte[] bArr = new byte[0];
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, new UnknownServerException());
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-connector-conn"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertNull(this.store.get(CONNECTOR));
    }

    @Test
    public void putSafeConnectorIgnoresStaleStatus() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", "anotherhost:8083");
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 1L);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap));
        this.store.read(consumerRecord(0L, "status-connector-conn", bArr));
        this.store.putSafe(new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0));
        ((KafkaBasedLog) Mockito.verify(this.kafkaBasedLog, Mockito.never())).send(ArgumentMatchers.anyString(), Mockito.any(), (Callback) Mockito.any(Callback.class));
        Assert.assertEquals(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, "anotherhost:8083", 1), this.store.get(CONNECTOR));
    }

    @Test
    public void putSafeWithNoPreviousValueIsPropagated() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Struct.class);
        this.kafkaBasedLog.send(Mockito.eq("status-connector-conn"), Mockito.eq(new byte[0]), (Callback) Mockito.any(Callback.class));
        ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, AbstractStatus.State.FAILED, WORKER_ID, 0);
        this.store.putSafe(connectorStatus);
        ((Converter) Mockito.verify(this.converter)).fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), forClass.capture());
        Assert.assertEquals(connectorStatus.state().toString(), ((Struct) forClass.getValue()).get("state"));
        Assert.assertEquals(connectorStatus.workerId(), ((Struct) forClass.getValue()).get("worker_id"));
        Assert.assertEquals(Integer.valueOf(connectorStatus.generation()), ((Struct) forClass.getValue()).get("generation"));
    }

    @Test
    public void putSafeOverridesValueSetBySameWorker() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 1L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("worker_id", WORKER_ID);
        hashMap2.put("state", "UNASSIGNED");
        hashMap2.put("generation", 0L);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap)).thenReturn(new SchemaAndValue((Schema) null, hashMap2));
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            this.store.read(consumerRecord(1L, "status-connector-conn", bArr));
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-connector-conn"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.read(consumerRecord(0L, "status-connector-conn", bArr));
        this.store.putSafe(new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0));
        Assert.assertEquals(new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0), this.store.get(CONNECTOR));
    }

    @Test
    public void putConnectorStateShouldOverride() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", "anotherhost:8083");
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 1L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("worker_id", WORKER_ID);
        hashMap2.put("state", "UNASSIGNED");
        hashMap2.put("generation", 0L);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap)).thenReturn(new SchemaAndValue((Schema) null, hashMap2));
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            this.store.read(consumerRecord(1L, "status-connector-conn", bArr));
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-connector-conn"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.read(consumerRecord(0L, "status-connector-conn", bArr));
        ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, AbstractStatus.State.UNASSIGNED, WORKER_ID, 0);
        this.store.put(connectorStatus);
        Assert.assertEquals(connectorStatus, this.store.get(CONNECTOR));
    }

    @Test
    public void readConnectorState() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 0L);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap));
        this.store.read(consumerRecord(0L, "status-connector-conn", bArr));
        Assert.assertEquals(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0), this.store.get(CONNECTOR));
    }

    @Test
    public void putTaskState() {
        byte[] bArr = new byte[0];
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            this.store.read(consumerRecord(1L, "status-connector-conn", bArr));
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-task-conn-0"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.put(new TaskStatus(TASK, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        Assert.assertNull(this.store.get(TASK));
    }

    @Test
    public void readTaskState() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 0L);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap));
        this.store.read(consumerRecord(0L, "status-task-conn-0", bArr));
        Assert.assertEquals(new TaskStatus(TASK, AbstractStatus.State.RUNNING, WORKER_ID, 0), this.store.get(TASK));
    }

    @Test
    public void readTaskStateShouldIgnoreStaleStatusesFromOtherWorkers() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", "anotherhost:8083");
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 10L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("worker_id", WORKER_ID);
        hashMap2.put("state", "UNASSIGNED");
        hashMap2.put("generation", 9L);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("worker_id", "yetanotherhost:8083");
        hashMap3.put("state", "RUNNING");
        hashMap3.put("generation", 1L);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap)).thenReturn(new SchemaAndValue((Schema) null, hashMap2)).thenReturn(new SchemaAndValue((Schema) null, hashMap3));
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            this.store.read(consumerRecord(2L, "status-task-conn-0", bArr));
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-task-conn-0"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        this.store.read(consumerRecord(0L, "status-task-conn-0", bArr));
        this.store.read(consumerRecord(1L, "status-task-conn-0", bArr));
        Assert.assertEquals(new TaskStatus(TASK, AbstractStatus.State.RUNNING, "anotherhost:8083", 10), this.store.get(TASK));
        TaskStatus taskStatus = new TaskStatus(TASK, AbstractStatus.State.RUNNING, "yetanotherhost:8083", 1);
        this.store.put(taskStatus);
        Assert.assertEquals(taskStatus, this.store.get(TASK));
    }

    @Test
    public void deleteConnectorState() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 0L);
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap));
        this.store.put(new ConnectorStatus(CONNECTOR, AbstractStatus.State.RUNNING, WORKER_ID, 0));
        TaskStatus taskStatus = new TaskStatus(TASK, AbstractStatus.State.RUNNING, WORKER_ID, 0);
        this.store.put(taskStatus);
        this.store.read(consumerRecord(0L, "status-task-conn-0", bArr));
        ((KafkaBasedLog) Mockito.verify(this.kafkaBasedLog)).send(Mockito.eq("status-connector-conn"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        ((KafkaBasedLog) Mockito.verify(this.kafkaBasedLog)).send(Mockito.eq("status-task-conn-0"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        Assert.assertEquals(new HashSet(Collections.singletonList(CONNECTOR)), this.store.connectors());
        Assert.assertEquals(new HashSet(Collections.singletonList(taskStatus)), new HashSet(this.store.getAll(CONNECTOR)));
        this.store.read(consumerRecord(0L, "status-connector-conn", null));
        Assert.assertTrue(this.store.connectors().isEmpty());
        Assert.assertTrue(this.store.getAll(CONNECTOR).isEmpty());
    }

    @Test
    public void deleteTaskState() {
        byte[] bArr = new byte[0];
        HashMap hashMap = new HashMap();
        hashMap.put("worker_id", WORKER_ID);
        hashMap.put("state", "RUNNING");
        hashMap.put("generation", 0L);
        Mockito.when(this.converter.fromConnectData((String) Mockito.eq(STATUS_TOPIC), (Schema) Mockito.any(Schema.class), Mockito.any(Struct.class))).thenReturn(bArr);
        Mockito.when(this.converter.toConnectData(STATUS_TOPIC, bArr)).thenReturn(new SchemaAndValue((Schema) null, hashMap));
        TaskStatus taskStatus = new TaskStatus(TASK, AbstractStatus.State.RUNNING, WORKER_ID, 0);
        this.store.put(taskStatus);
        this.store.read(consumerRecord(0L, "status-task-conn-0", bArr));
        ((KafkaBasedLog) Mockito.verify(this.kafkaBasedLog)).send(Mockito.eq("status-task-conn-0"), Mockito.eq(bArr), (Callback) Mockito.any(Callback.class));
        Assert.assertEquals(new HashSet(Collections.singletonList(taskStatus)), new HashSet(this.store.getAll(CONNECTOR)));
        this.store.read(consumerRecord(0L, "status-task-conn-0", null));
        Assert.assertTrue(this.store.getAll(CONNECTOR).isEmpty());
    }

    @Test
    public void testClientIds() {
        Supplier supplier = () -> {
            return (TopicAdmin) Mockito.mock(TopicAdmin.class);
        };
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Map.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Map.class);
        this.store = (KafkaStatusBackingStore) Mockito.spy(new KafkaStatusBackingStore(new MockTime(), this.converter, supplier, "test-client-id-"));
        ((KafkaStatusBackingStore) Mockito.doReturn((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class)).when(this.store)).createKafkaBasedLog((String) Mockito.any(), (Map) forClass.capture(), (Map) forClass2.capture(), (org.apache.kafka.connect.util.Callback) Mockito.any(), (NewTopic) Mockito.any(), (Supplier) Mockito.any(), (WorkerConfig) Mockito.any(), (Time) Mockito.any());
        Mockito.when(this.workerConfig.getString("status.storage.topic")).thenReturn("connect-statuses");
        this.store.configure(this.workerConfig);
        String str = "test-client-id-statuses";
        Assert.assertEquals(str, ((Map) forClass.getValue()).get("client.id"));
        Assert.assertEquals(str, ((Map) forClass2.getValue()).get("client.id"));
    }

    private static ConsumerRecord<String, byte[]> consumerRecord(long j, String str, byte[] bArr) {
        return new ConsumerRecord<>(STATUS_TOPIC, 0, j, System.currentTimeMillis(), TimestampType.CREATE_TIME, 0, 0, str, bArr, new RecordHeaders(), Optional.empty());
    }
}
