package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.class */
public class ConnectorOffsetBackingStoreTest {
    private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
    private static final byte[] OFFSET_KEY_SERIALIZED_1 = "key-serialized-1".getBytes();
    private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
    private static final KafkaException PRODUCE_EXCEPTION = new KafkaException();
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();

    @Test
    public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, (Object) null), Utils.mkEntry(OFFSET_KEY_SERIALIZED_1, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.errorNext(PRODUCE_EXCEPTION);
        createMockProducer.completeNext();
        assertFlushFailure(atomicBoolean, atomicReference, atomicReference2, future, false);
    }

    @Test
    public void testFlushSuccessWhenWritesSucceedToBothPrimaryAndSecondaryStores() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, (Object) null), Utils.mkEntry(OFFSET_KEY_SERIALIZED_1, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.completeNext();
        createMockProducer.completeNext();
        createMockProducer.completeNext();
        createMockProducer2.completeNext();
        assertFlushSuccess(atomicBoolean, atomicReference, atomicReference2, future);
    }

    @Test
    public void testFlushSuccessWhenWriteToSecondaryStoreFailsForRegularOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, OFFSET_VALUE_SERIALIZED), Utils.mkEntry(OFFSET_KEY_SERIALIZED_1, (Object) null)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.completeNext();
        createMockProducer.completeNext();
        createMockProducer.completeNext();
        createMockProducer2.errorNext(PRODUCE_EXCEPTION);
        assertFlushSuccess(atomicBoolean, atomicReference, atomicReference2, future);
    }

    @Test
    public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForRegularOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, OFFSET_VALUE_SERIALIZED), Utils.mkEntry(OFFSET_KEY_SERIALIZED_1, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.completeNext();
        createMockProducer.errorNext(PRODUCE_EXCEPTION);
        assertFlushFailure(atomicBoolean, atomicReference, atomicReference2, future, false);
    }

    @Test
    public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForRegularAndTombstoneOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, (Object) null), Utils.mkEntry(OFFSET_KEY_SERIALIZED_1, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.completeNext();
        createMockProducer.errorNext(PRODUCE_EXCEPTION);
        assertFlushFailure(atomicBoolean, atomicReference, atomicReference2, future, false);
    }

    @Test
    public void testFlushSuccessWhenWritesToPrimaryStoreSucceedsWithNoSecondaryStore() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        ConnectorOffsetBackingStore withOnlyConnectorStore = ConnectorOffsetBackingStore.withOnlyConnectorStore(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withOnlyConnectorStore.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer.completeNext();
        assertFlushSuccess(atomicBoolean, atomicReference, atomicReference2, future);
    }

    @Test
    public void testFlushFailureWhenWritesToPrimaryStoreFailsWithNoSecondaryStore() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        ConnectorOffsetBackingStore withOnlyConnectorStore = ConnectorOffsetBackingStore.withOnlyConnectorStore(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withOnlyConnectorStore.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer.errorNext(PRODUCE_EXCEPTION);
        assertFlushFailure(atomicBoolean, atomicReference, atomicReference2, future, false);
    }

    @Test
    public void testFlushFailureWhenWritesToPrimaryStoreTimesoutAndSecondarySucceedsForTombstoneOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, (Object) null)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.completeNext();
        assertFlushFailure(atomicBoolean, atomicReference, atomicReference2, future, true);
    }

    @Test
    public void testFlushFailureWhenWritesToSecondaryStoreTimesoutForTombstoneOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, (Object) null)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        assertFlushFailure(atomicBoolean, atomicReference, atomicReference2, future, true);
    }

    @Test
    public void testFlushSuccessWhenWritesToSecondaryStoreTimesoutForRegularOffsets() {
        MockProducer<byte[], byte[]> createMockProducer = createMockProducer();
        MockProducer<byte[], byte[]> createMockProducer2 = createMockProducer();
        ConnectorOffsetBackingStore withConnectorAndWorkerStores = ConnectorOffsetBackingStore.withConnectorAndWorkerStores(() -> {
            return LoggingContext.forConnector("source-connector");
        }, createStore("topic2", createMockProducer2), createStore("topic1", createMockProducer), "offsets-topic", (TopicAdmin) Mockito.mock(TopicAdmin.class));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        AtomicReference<Throwable> atomicReference2 = new AtomicReference<>();
        Future<Void> future = withConnectorAndWorkerStores.set(getSerialisedOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(OFFSET_KEY_SERIALIZED, (Object) null), Utils.mkEntry(OFFSET_KEY_SERIALIZED_1, OFFSET_VALUE_SERIALIZED)})), (th, r7) -> {
            atomicBoolean.set(true);
            atomicReference.set(r7);
            atomicReference2.set(th);
        });
        assertNoPrematureCallbackInvocation(atomicBoolean);
        createMockProducer2.completeNext();
        createMockProducer.completeNext();
        createMockProducer.completeNext();
        assertFlushSuccess(atomicBoolean, atomicReference, atomicReference2, future);
    }

    private void assertNoPrematureCallbackInvocation(AtomicBoolean atomicBoolean) {
        Assert.assertFalse("Store callback should not be invoked before underlying producer callback", atomicBoolean.get());
    }

    private void assertFlushFailure(AtomicBoolean atomicBoolean, AtomicReference<Object> atomicReference, AtomicReference<Throwable> atomicReference2, Future<Void> future, boolean z) {
        if (z) {
            Assert.assertThrows(TimeoutException.class, () -> {
            });
            return;
        }
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        Assert.assertNotNull(executionException.getCause());
        Assert.assertEquals(PRODUCE_EXCEPTION, executionException.getCause());
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(PRODUCE_EXCEPTION, atomicReference2.get());
    }

    private void assertFlushSuccess(AtomicBoolean atomicBoolean, AtomicReference<Object> atomicReference, AtomicReference<Throwable> atomicReference2, Future<Void> future) {
        Assertions.assertDoesNotThrow(() -> {
            return (Void) future.get(1000L, TimeUnit.MILLISECONDS);
        });
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertNull(atomicReference.get());
        Assert.assertNull(atomicReference2.get());
    }

    private KafkaOffsetBackingStore createStore(final String str, final Producer<byte[], byte[]> producer) {
        KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore(() -> {
            return (TopicAdmin) Mockito.mock(TopicAdmin.class);
        }, () -> {
            return "connect";
        }, (Converter) Mockito.mock(Converter.class));
        KafkaBasedLog<byte[], byte[]> kafkaBasedLog = new KafkaBasedLog<byte[], byte[]>(str, new HashMap(), new HashMap(), () -> {
            return (TopicAdmin) Mockito.mock(TopicAdmin.class);
        }, (Callback) Mockito.mock(Callback.class), new MockTime(), null) { // from class: org.apache.kafka.connect.storage.ConnectorOffsetBackingStoreTest.1
            protected Producer<byte[], byte[]> createProducer() {
                return producer;
            }

            protected Consumer<byte[], byte[]> createConsumer() {
                return ConnectorOffsetBackingStoreTest.this.createMockConsumer(str);
            }
        };
        kafkaBasedLog.start();
        kafkaOffsetBackingStore.offsetLog = kafkaBasedLog;
        return kafkaOffsetBackingStore;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MockConsumer<byte[], byte[]> createMockConsumer(String str) {
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
        Node noNode = Node.noNode();
        Node[] nodeArr = {noNode};
        mockConsumer.updatePartitions(str, Collections.singletonList(new PartitionInfo(str, 0, noNode, nodeArr, nodeArr)));
        mockConsumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition(str, 0), 100L)}));
        return mockConsumer;
    }

    private MockProducer<byte[], byte[]> createMockProducer() {
        return new MockProducer<>(Cluster.empty(), false, (Partitioner) null, this.byteArraySerializer, this.byteArraySerializer);
    }

    private Map<ByteBuffer, ByteBuffer> getSerialisedOffsets(Map<byte[], byte[]> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
            hashMap.put(ByteBuffer.wrap(entry.getKey()), entry.getValue() == null ? null : ByteBuffer.wrap(entry.getValue()));
        }
        return hashMap;
    }
}
