package org.apache.kafka.storage.internals.log;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/storage/internals/log/ProducerStateManagerTest.class */
public class ProducerStateManagerTest {
    private final long producerId = 1;
    private final short epoch = 0;
    private final int defaultSequence = 0;
    private final int maxTransactionTimeoutMs = 300000;
    private final long lateTransactionTimeoutMs = 600000;
    private final File logDir = TestUtils.tempDirectory();
    private final TopicPartition partition = new TopicPartition("test", 0);
    private final ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true);
    private final MockTime time = new MockTime();
    private final ProducerStateManager stateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);

    @AfterEach
    public void tearDown() throws IOException {
        Utils.delete(this.logDir);
    }

    @Test
    public void testBasicIdMapping() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 0L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 0L, 1L, false);
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, 1, 0L, 1L, false);
        });
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, 5, 0L, 2L, false);
        });
        appendClientEntry(this.stateManager, 1L, (short) 1, 0, 0L, 3L, false);
        Assertions.assertThrows(InvalidProducerEpochException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 4L, false);
        });
    }

    @Test
    public void testAppendTxnMarkerWithNoProducerState() {
        short s = 2;
        appendEndTxnMarker(this.stateManager, 1L, (short) 2, ControlRecordType.COMMIT, 27L);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals((short) 2, lastEntryOrElseThrownByProducerId.producerEpoch());
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId.producerId());
        Assertions.assertEquals(-1, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertThrows(InvalidProducerEpochException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 4L, false);
        });
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, s, 17, 0L, 4L, false);
        });
        appendClientEntry(this.stateManager, 1L, (short) 2, 0, 39L, 4L, false);
        ProducerStateEntry lastEntryOrElseThrownByProducerId2 = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals((short) 2, lastEntryOrElseThrownByProducerId2.producerEpoch());
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId2.producerId());
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId2.lastSeq());
    }

    @Test
    public void testProducerSequenceWrapAround() {
        appendReplicationEntry(this.stateManager, (short) 15, Integer.MAX_VALUE, 735L);
        appendClientEntry(this.stateManager, 1L, (short) 15, 0, 735 + 500, false);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals((short) 15, lastEntryOrElseThrownByProducerId.producerEpoch());
        Assertions.assertEquals(Integer.MAX_VALUE, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId.lastSeq());
    }

    @Test
    public void testProducerSequenceWithWrapAroundBatchRecord() {
        ProducerAppendInfo prepareUpdate = this.stateManager.prepareUpdate(1L, AppendOrigin.REPLICATION);
        prepareUpdate.appendDataBatch((short) 15, 2147483637, 9, this.time.milliseconds(), new LogOffsetMetadata(2000L), 2020L, false);
        Assertions.assertEquals(Optional.empty(), this.stateManager.lastEntry(1L));
        this.stateManager.update(prepareUpdate);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals(2147483637, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(9, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertEquals(2000L, lastEntryOrElseThrownByProducerId.firstDataOffset());
        Assertions.assertEquals(2020L, lastEntryOrElseThrownByProducerId.lastDataOffset());
    }

    @Test
    public void testProducerSequenceInvalidWrapAround() {
        short s = 15;
        long j = 735;
        appendReplicationEntry(this.stateManager, (short) 15, Integer.MAX_VALUE, 735L);
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, s, 1, j + 500, false);
        });
    }

    @Test
    public void testNoValidationOnFirstEntryWhenLoadingLog() {
        appendReplicationEntry(this.stateManager, (short) 5, 16, 735L);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals((short) 5, lastEntryOrElseThrownByProducerId.producerEpoch());
        Assertions.assertEquals(16, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(16, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertEquals(735L, lastEntryOrElseThrownByProducerId.lastDataOffset());
        Assertions.assertEquals(735L, lastEntryOrElseThrownByProducerId.firstDataOffset());
    }

    @Test
    public void testControlRecordBumpsProducerEpoch() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        appendEndTxnMarker(this.stateManager, 1L, (short) 1, ControlRecordType.ABORT, 1L);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals((short) 1, lastEntryOrElseThrownByProducerId.producerEpoch());
        Assertions.assertEquals(OptionalLong.empty(), lastEntryOrElseThrownByProducerId.currentTxnFirstOffset());
        Assertions.assertEquals(-1, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(-1, lastEntryOrElseThrownByProducerId.lastSeq());
        appendClientEntry(this.stateManager, 1L, (short) 1, 0, 2L, false);
        Assertions.assertEquals(0, getLastEntryOrElseThrownByProducerId(this.stateManager, 1L).firstSeq());
    }

    @Test
    public void testTxnFirstOffsetMetadataCached() {
        ProducerAppendInfo producerAppendInfo = new ProducerAppendInfo(this.partition, 1L, ProducerStateEntry.empty(1L), AppendOrigin.CLIENT, this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0));
        LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(992342L, 990000L, 234224);
        producerAppendInfo.appendDataBatch((short) 0, 0, 0, this.time.milliseconds(), logOffsetMetadata, 992342L, true);
        this.stateManager.update(producerAppendInfo);
        Assertions.assertEquals(Optional.of(logOffsetMetadata), this.stateManager.firstUnstableOffset());
    }

    @Test
    public void testHasLateTransaction() {
        appendClientEntry(this.stateManager, 39L, (short) 2, 0, 100L, true);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.time.sleep(500L);
        appendClientEntry(this.stateManager, 57L, (short) 9, 0, 150L, true);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.time.sleep(599501L);
        Assertions.assertTrue(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.time.sleep(500L);
        Assertions.assertTrue(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        appendEndTxnMarker(this.stateManager, 39L, (short) 2, ControlRecordType.COMMIT, 200L);
        Assertions.assertTrue(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        appendEndTxnMarker(this.stateManager, 57L, (short) 9, ControlRecordType.COMMIT, 250L);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
    }

    @Test
    public void testHasLateTransactionInitializedAfterReload() throws IOException {
        appendClientEntry(this.stateManager, 39L, (short) 2, 0, 100L, true);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.time.sleep(500L);
        appendClientEntry(this.stateManager, 57L, (short) 9, 0, 150L, true);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.stateManager.takeSnapshot();
        this.time.sleep(599501L);
        Assertions.assertTrue(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        producerStateManager.truncateAndReload(0L, this.stateManager.mapEndOffset(), this.time.milliseconds());
        Assertions.assertTrue(producerStateManager.hasLateTransaction(this.time.milliseconds()));
    }

    @Test
    public void testHasLateTransactionUpdatedAfterPartialTruncation() throws IOException {
        appendClientEntry(this.stateManager, 39L, (short) 2, 0, 100L, true);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.time.sleep(600001L);
        Assertions.assertTrue(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.stateManager.truncateAndReload(0L, 80L, this.time.milliseconds());
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
    }

    @Test
    public void testHasLateTransactionUpdatedAfterFullTruncation() throws IOException {
        appendClientEntry(this.stateManager, 39L, (short) 2, 0, 100L, true);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.time.sleep(600001L);
        Assertions.assertTrue(this.stateManager.hasLateTransaction(this.time.milliseconds()));
        this.stateManager.truncateFullyAndStartAt(150L);
        Assertions.assertFalse(this.stateManager.hasLateTransaction(this.time.milliseconds()));
    }

    @Test
    public void testPrepareUpdateDoesNotMutate() {
        ProducerAppendInfo prepareUpdate = this.stateManager.prepareUpdate(1L, AppendOrigin.CLIENT);
        prepareUpdate.appendDataBatch((short) 0, 0, 5, this.time.milliseconds(), new LogOffsetMetadata(15L), 20L, false);
        Assertions.assertEquals(Optional.empty(), this.stateManager.lastEntry(1L));
        this.stateManager.update(prepareUpdate);
        getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        ProducerAppendInfo prepareUpdate2 = this.stateManager.prepareUpdate(1L, AppendOrigin.CLIENT);
        prepareUpdate2.appendDataBatch((short) 0, 6, 10, this.time.milliseconds(), new LogOffsetMetadata(26L), 30L, false);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(5, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertEquals(20L, lastEntryOrElseThrownByProducerId.lastDataOffset());
        this.stateManager.update(prepareUpdate2);
        ProducerStateEntry lastEntryOrElseThrownByProducerId2 = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId2.firstSeq());
        Assertions.assertEquals(10, lastEntryOrElseThrownByProducerId2.lastSeq());
        Assertions.assertEquals(30L, lastEntryOrElseThrownByProducerId2.lastDataOffset());
    }

    @Test
    public void updateProducerTransactionState() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 9L, false);
        ProducerAppendInfo prepareUpdate = this.stateManager.prepareUpdate(1L, AppendOrigin.CLIENT);
        prepareUpdate.appendDataBatch((short) 0, 1, 5, this.time.milliseconds(), new LogOffsetMetadata(16L), 20L, true);
        verifyLastEntryWithTxnData(prepareUpdate.toEntry(), 1, 5L, 16L, 20L, OptionalLong.of(16L), prepareUpdate);
        prepareUpdate.appendDataBatch((short) 0, 6, 10, this.time.milliseconds(), new LogOffsetMetadata(26L), 30L, true);
        verifyLastEntryWithTxnData(prepareUpdate.toEntry(), 1, 10L, 16L, 30L, OptionalLong.of(16L), prepareUpdate);
        CompletedTxn completedTxn = (CompletedTxn) prepareUpdate.appendEndTxnMarker(new EndTransactionMarker(ControlRecordType.COMMIT, 15), (short) 0, 40L, this.time.milliseconds()).orElseThrow(() -> {
            return new RuntimeException("The transaction should be completed");
        });
        Assertions.assertEquals(1L, completedTxn.producerId);
        Assertions.assertEquals(16L, completedTxn.firstOffset);
        Assertions.assertEquals(40L, completedTxn.lastOffset);
        Assertions.assertFalse(completedTxn.isAborted);
        ProducerStateEntry entry = prepareUpdate.toEntry();
        verifyLastEntryWithTxnData(entry, 1, 10L, 16L, 30L, OptionalLong.empty(), prepareUpdate);
        Assertions.assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset());
    }

    @Test
    public void testOutOfSequenceAfterControlRecordEpochBump() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, true);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, true);
        short s = 1;
        appendEndTxnMarker(this.stateManager, 1L, (short) 1, ControlRecordType.ABORT, 1L);
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, s, 2, 2L, true);
        });
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) (s + 1), 2, 2L, true);
        });
        appendClientEntry(this.stateManager, 1L, (short) 1, 0, 0L, true);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals((short) 1, lastEntryOrElseThrownByProducerId.producerEpoch());
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId.lastSeq());
    }

    @Test
    public void testNonTransactionalAppendWithOngoingTransaction() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, true);
        Assertions.assertThrows(InvalidTxnStateException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, false);
        });
    }

    @Test
    public void testTruncateAndReloadRemovesOutOfRangeSnapshots() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        this.stateManager.takeSnapshot();
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, false);
        this.stateManager.takeSnapshot();
        appendClientEntry(this.stateManager, 1L, (short) 0, 2, 2L, false);
        this.stateManager.takeSnapshot();
        appendClientEntry(this.stateManager, 1L, (short) 0, 3, 3L, false);
        this.stateManager.takeSnapshot();
        appendClientEntry(this.stateManager, 1L, (short) 0, 4, 4L, false);
        this.stateManager.takeSnapshot();
        this.stateManager.truncateAndReload(1L, 3L, this.time.milliseconds());
        Assertions.assertEquals(OptionalLong.of(2L), this.stateManager.oldestSnapshotOffset());
        Assertions.assertEquals(OptionalLong.of(3L), this.stateManager.latestSnapshotOffset());
    }

    @Test
    public void testTakeSnapshot() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 0L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, 1L, false);
        this.stateManager.takeSnapshot();
        String[] strArr = (String[]) Objects.requireNonNull(this.logDir.list());
        Assertions.assertEquals(1, strArr.length, "Directory doesn't contain a single file as expected");
        Assertions.assertFalse(strArr[0].isEmpty(), "Snapshot file is empty");
    }

    @Test
    public void testFetchSnapshotEmptySnapShot() {
        Assertions.assertEquals(Optional.empty(), this.stateManager.fetchSnapshot(1));
    }

    @Test
    public void testRecoverFromSnapshotUnfinishedTransaction() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, true);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, true);
        this.stateManager.takeSnapshot();
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        producerStateManager.truncateAndReload(0L, 3L, this.time.milliseconds());
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(producerStateManager, 1L);
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId.firstDataOffset());
        Assertions.assertEquals(1, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId.lastDataOffset());
        Assertions.assertEquals(1, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertEquals(OptionalLong.of(0L), lastEntryOrElseThrownByProducerId.currentTxnFirstOffset());
        Assertions.assertDoesNotThrow(() -> {
            appendClientEntry(producerStateManager, 1L, (short) 0, 2, 2L, true);
        });
    }

    @Test
    public void testRecoverFromSnapshotFinishedTransaction() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, true);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, true);
        appendEndTxnMarker(this.stateManager, 1L, (short) 0, ControlRecordType.ABORT, 2L);
        this.stateManager.takeSnapshot();
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        producerStateManager.truncateAndReload(0L, 3L, this.time.milliseconds());
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(producerStateManager, 1L);
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId.firstDataOffset());
        Assertions.assertEquals(1, lastEntryOrElseThrownByProducerId.firstSeq());
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId.lastDataOffset());
        Assertions.assertEquals(1, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertEquals(OptionalLong.empty(), lastEntryOrElseThrownByProducerId.currentTxnFirstOffset());
    }

    @Test
    public void testRecoverFromSnapshotEmptyTransaction() throws IOException {
        long milliseconds = this.time.milliseconds();
        appendEndTxnMarker(this.stateManager, 1L, (short) 0, ControlRecordType.ABORT, 0L, 0, milliseconds);
        this.stateManager.takeSnapshot();
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        producerStateManager.truncateAndReload(0L, 1L, this.time.milliseconds());
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(producerStateManager, 1L);
        Assertions.assertEquals(milliseconds, lastEntryOrElseThrownByProducerId.lastTimestamp());
        Assertions.assertEquals(OptionalLong.empty(), lastEntryOrElseThrownByProducerId.currentTxnFirstOffset());
    }

    @Test
    public void testProducerStateAfterFencingAbortMarker() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, true);
        appendEndTxnMarker(this.stateManager, 1L, (short) 1, ControlRecordType.ABORT, 1L);
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals(OptionalLong.empty(), lastEntryOrElseThrownByProducerId.currentTxnFirstOffset());
        Assertions.assertEquals(-1L, lastEntryOrElseThrownByProducerId.lastDataOffset());
        Assertions.assertEquals(-1L, lastEntryOrElseThrownByProducerId.firstDataOffset());
        this.stateManager.removeExpiredProducers(this.time.milliseconds());
        Assertions.assertDoesNotThrow(() -> {
            return getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        });
    }

    @Test
    public void testRemoveExpiredPidsOnReload() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 0L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, 1L, false);
        this.stateManager.takeSnapshot();
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        producerStateManager.truncateAndReload(0L, 1L, 70000L);
        appendClientEntry(producerStateManager, 1L, (short) 0, 2, 2L, 70001L, false);
        Assertions.assertEquals(1, producerStateManager.activeProducers().size());
        Assertions.assertEquals(2, ((ProducerStateEntry) producerStateManager.activeProducers().values().iterator().next()).lastSeq());
        Assertions.assertEquals(3L, producerStateManager.mapEndOffset());
    }

    @Test
    public void testAcceptAppendWithoutProducerStateOnReplica() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 0L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, 1L, false);
        this.stateManager.takeSnapshot();
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        producerStateManager.truncateAndReload(0L, 1L, 70000L);
        Assertions.assertFalse(producerStateManager.activeProducers().containsKey(1L));
        appendReplicationEntry(producerStateManager, (short) 0, 2, 2L, 70001L);
        Assertions.assertTrue(producerStateManager.activeProducers().containsKey(1L));
        ProducerStateEntry producerStateEntry = (ProducerStateEntry) producerStateManager.activeProducers().get(1L);
        Assertions.assertEquals((short) 0, producerStateEntry.producerEpoch());
        Assertions.assertEquals(2, producerStateEntry.firstSeq());
        Assertions.assertEquals(2, producerStateEntry.lastSeq());
    }

    @Test
    public void testAcceptAppendWithSequenceGapsOnReplica() {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 0L, false);
        int i = 3;
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, i, 1L, 1L, false);
        });
        Assertions.assertTrue(this.stateManager.activeProducers().containsKey(1L));
        Assertions.assertNotNull((ProducerStateEntry) this.stateManager.activeProducers().get(1L));
        Assertions.assertEquals(0L, r0.lastSeq());
        appendReplicationEntry(this.stateManager, (short) 0, 3, 1L, 1L);
        ProducerStateEntry producerStateEntry = (ProducerStateEntry) this.stateManager.activeProducers().get(1L);
        Assertions.assertNotNull(producerStateEntry);
        Assertions.assertEquals(3, producerStateEntry.lastSeq());
    }

    @Test
    public void testDeleteSnapshotsBefore() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(2L), currentSnapshotOffsets());
        appendClientEntry(this.stateManager, 1L, (short) 0, 2, 2L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(2, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(new HashSet(Arrays.asList(2L, 3L)), currentSnapshotOffsets());
        this.stateManager.deleteSnapshotsBefore(3L);
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(3L), currentSnapshotOffsets());
        this.stateManager.deleteSnapshotsBefore(4L);
        Assertions.assertEquals(0, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.emptySet(), currentSnapshotOffsets());
    }

    @Test
    public void testTruncateFullyAndStartAt() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(2L), currentSnapshotOffsets());
        appendClientEntry(this.stateManager, 1L, (short) 0, 2, 2L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(2, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(new HashSet(Arrays.asList(2L, 3L)), currentSnapshotOffsets());
        this.stateManager.truncateFullyAndStartAt(0L);
        Assertions.assertEquals(0, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.emptySet(), currentSnapshotOffsets());
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(1L), currentSnapshotOffsets());
    }

    @Test
    public void testReloadSnapshots() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 2, 2L, false);
        this.stateManager.takeSnapshot();
        Map map = (Map) Arrays.stream((Object[]) Objects.requireNonNull(this.logDir.listFiles())).map((v0) -> {
            return v0.toPath();
        }).collect(Collectors.toMap(path -> {
            return path;
        }, path2 -> {
            return (byte[]) Assertions.assertDoesNotThrow(() -> {
                return Files.readAllBytes(path2);
            });
        }));
        appendClientEntry(this.stateManager, 1L, (short) 0, 3, 3L, false);
        appendClientEntry(this.stateManager, 1L, (short) 0, 4, 4L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(2, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(new HashSet(Arrays.asList(3L, 5L)), currentSnapshotOffsets());
        this.stateManager.truncateAndReload(3L, 5L, this.time.milliseconds());
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(5L), currentSnapshotOffsets());
        map.forEach((path3, bArr) -> {
        });
        this.stateManager.truncateFullyAndReloadSnapshots();
        Assertions.assertEquals(OptionalLong.of(3L), this.stateManager.latestSnapshotOffset());
        Assertions.assertEquals(Collections.singleton(3L), currentSnapshotOffsets());
    }

    @Test
    public void testFirstUnstableOffsetAfterTruncation() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 99L, true);
        Assertions.assertEquals(99L, (Long) this.stateManager.firstUnstableOffset().map(logOffsetMetadata -> {
            return Long.valueOf(logOffsetMetadata.messageOffset);
        }).orElseThrow(() -> {
            return new RuntimeException("First unstable offset should be present");
        }));
        this.stateManager.takeSnapshot();
        appendEndTxnMarker(this.stateManager, 1L, (short) 0, ControlRecordType.COMMIT, 105L);
        this.stateManager.onHighWatermarkUpdated(106L);
        Assertions.assertEquals(Optional.empty(), this.stateManager.firstUnstableOffset().map(logOffsetMetadata2 -> {
            return Long.valueOf(logOffsetMetadata2.messageOffset);
        }));
        this.stateManager.takeSnapshot();
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 106L, false);
        this.stateManager.truncateAndReload(0L, 106L, this.time.milliseconds());
        Assertions.assertEquals(Optional.empty(), this.stateManager.firstUnstableOffset().map(logOffsetMetadata3 -> {
            return Long.valueOf(logOffsetMetadata3.messageOffset);
        }));
        this.stateManager.truncateAndReload(0L, 100L, this.time.milliseconds());
        Assertions.assertEquals(99L, (Long) this.stateManager.firstUnstableOffset().map(logOffsetMetadata4 -> {
            return Long.valueOf(logOffsetMetadata4.messageOffset);
        }).orElseThrow(() -> {
            return new RuntimeException("First unstable offset should be present");
        }));
    }

    @Test
    public void testLoadFromSnapshotRetainsNonExpiredProducers() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        appendClientEntry(this.stateManager, 2L, (short) 0, 0, 1L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(2, this.stateManager.activeProducers().size());
        this.stateManager.truncateAndReload(1L, 2L, this.time.milliseconds());
        Assertions.assertEquals(2, this.stateManager.activeProducers().size());
        ProducerStateEntry lastEntryOrElseThrownByProducerId = getLastEntryOrElseThrownByProducerId(this.stateManager, 1L);
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId.lastSeq());
        Assertions.assertEquals(0L, lastEntryOrElseThrownByProducerId.lastDataOffset());
        ProducerStateEntry lastEntryOrElseThrownByProducerId2 = getLastEntryOrElseThrownByProducerId(this.stateManager, 2L);
        Assertions.assertEquals(0, lastEntryOrElseThrownByProducerId2.lastSeq());
        Assertions.assertEquals(1L, lastEntryOrElseThrownByProducerId2.lastDataOffset());
    }

    @Test
    public void testSkipSnapshotIfOffsetUnchanged() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, 0L, false);
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(1L), currentSnapshotOffsets());
        this.stateManager.takeSnapshot();
        Assertions.assertEquals(1, ((File[]) Objects.requireNonNull(this.logDir.listFiles())).length);
        Assertions.assertEquals(Collections.singleton(1L), currentSnapshotOffsets());
    }

    @Test
    public void testPidExpirationTimeout() {
        appendClientEntry(this.stateManager, 1L, (short) 5, 0, 1L, false);
        this.time.sleep(this.producerStateManagerConfig.producerIdExpirationMs() + 1);
        this.stateManager.removeExpiredProducers(this.time.milliseconds());
        appendClientEntry(this.stateManager, 1L, (short) 5, 1, 2L, false);
        Assertions.assertEquals(1, this.stateManager.activeProducers().size());
        Assertions.assertEquals(1, ((ProducerStateEntry) this.stateManager.activeProducers().values().iterator().next()).lastSeq());
        Assertions.assertEquals(3L, this.stateManager.mapEndOffset());
    }

    @Test
    public void testFirstUnstableOffset() {
        Assertions.assertEquals(OptionalLong.empty(), this.stateManager.firstUndecidedOffset());
        appendClientEntry(this.stateManager, 1L, (short) 5, 0, 99L, true);
        Assertions.assertEquals(OptionalLong.of(99L), this.stateManager.firstUndecidedOffset());
        Assertions.assertEquals(Optional.of(99L), this.stateManager.firstUnstableOffset().map(logOffsetMetadata -> {
            return Long.valueOf(logOffsetMetadata.messageOffset);
        }));
        appendClientEntry(this.stateManager, 2L, (short) 5, 0, 105L, true);
        Assertions.assertEquals(OptionalLong.of(99L), this.stateManager.firstUndecidedOffset());
        Assertions.assertEquals(Optional.of(99L), this.stateManager.firstUnstableOffset().map(logOffsetMetadata2 -> {
            return Long.valueOf(logOffsetMetadata2.messageOffset);
        }));
        appendEndTxnMarker(this.stateManager, 1L, (short) 5, ControlRecordType.COMMIT, 109L);
        Assertions.assertEquals(OptionalLong.of(105L), this.stateManager.firstUndecidedOffset());
        Assertions.assertEquals(Optional.of(99L), this.stateManager.firstUnstableOffset().map(logOffsetMetadata3 -> {
            return Long.valueOf(logOffsetMetadata3.messageOffset);
        }));
        this.stateManager.onHighWatermarkUpdated(100L);
        Assertions.assertEquals(Optional.of(99L), this.stateManager.firstUnstableOffset().map(logOffsetMetadata4 -> {
            return Long.valueOf(logOffsetMetadata4.messageOffset);
        }));
        this.stateManager.onHighWatermarkUpdated(110L);
        Assertions.assertEquals(Optional.of(105L), this.stateManager.firstUnstableOffset().map(logOffsetMetadata5 -> {
            return Long.valueOf(logOffsetMetadata5.messageOffset);
        }));
        appendEndTxnMarker(this.stateManager, 2L, (short) 5, ControlRecordType.ABORT, 112L);
        Assertions.assertFalse(this.stateManager.firstUndecidedOffset().isPresent());
        Assertions.assertEquals(Optional.of(105L), this.stateManager.firstUnstableOffset().map(logOffsetMetadata6 -> {
            return Long.valueOf(logOffsetMetadata6.messageOffset);
        }));
        this.stateManager.onHighWatermarkUpdated(113L);
        Assertions.assertFalse(this.stateManager.firstUnstableOffset().map(logOffsetMetadata7 -> {
            return Long.valueOf(logOffsetMetadata7.messageOffset);
        }).isPresent());
    }

    @Test
    public void testProducersWithOngoingTransactionsDontExpire() {
        appendClientEntry(this.stateManager, 1L, (short) 5, 0, 99L, true);
        Assertions.assertEquals(OptionalLong.of(99L), this.stateManager.firstUndecidedOffset());
        this.time.sleep(this.producerStateManagerConfig.producerIdExpirationMs() + 1);
        this.stateManager.removeExpiredProducers(this.time.milliseconds());
        Assertions.assertTrue(this.stateManager.lastEntry(1L).isPresent());
        Assertions.assertEquals(OptionalLong.of(99L), this.stateManager.firstUndecidedOffset());
        this.stateManager.removeExpiredProducers(this.time.milliseconds());
        Assertions.assertTrue(this.stateManager.lastEntry(1L).isPresent());
    }

    @Test
    public void testSequenceNotValidatedForGroupMetadataTopic() throws IOException {
        ProducerStateManager producerStateManager = new ProducerStateManager(new TopicPartition("__consumer_offsets", 0), this.logDir, 300000, this.producerStateManagerConfig, this.time);
        appendEntry(producerStateManager, 1L, (short) 0, -1, 99L, this.time.milliseconds(), AppendOrigin.COORDINATOR, true);
        appendEntry(producerStateManager, 1L, (short) 0, -1, 100L, this.time.milliseconds(), AppendOrigin.COORDINATOR, true);
    }

    @Test
    public void testOldEpochForControlRecord() {
        Assertions.assertFalse(this.stateManager.firstUndecidedOffset().isPresent());
        appendClientEntry(this.stateManager, 1L, (short) 5, 0, 99L, true);
        Assertions.assertThrows(InvalidProducerEpochException.class, () -> {
            appendEndTxnMarker(this.stateManager, 1L, (short) 3, ControlRecordType.COMMIT, 100L);
        });
    }

    @Test
    public void testCoordinatorFencing() {
        short s = 5;
        appendClientEntry(this.stateManager, 1L, (short) 5, 0, 99L, true);
        appendEndTxnMarker(this.stateManager, 1L, (short) 5, ControlRecordType.COMMIT, 100L, 1, this.time.milliseconds());
        Assertions.assertEquals(1, getLastEntryOrElseThrownByProducerId(this.stateManager, 1L).coordinatorEpoch());
        appendEndTxnMarker(this.stateManager, 1L, (short) 5, ControlRecordType.COMMIT, 101L, 1, this.time.milliseconds());
        appendEndTxnMarker(this.stateManager, 1L, (short) 5, ControlRecordType.COMMIT, 102L, 2, this.time.milliseconds());
        Assertions.assertThrows(TransactionCoordinatorFencedException.class, () -> {
            appendEndTxnMarker(this.stateManager, 1L, s, ControlRecordType.COMMIT, 103L, 1, this.time.milliseconds());
        });
    }

    @Test
    public void testCoordinatorFencedAfterReload() throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 99L, true);
        appendEndTxnMarker(this.stateManager, 1L, (short) 0, ControlRecordType.COMMIT, 100L, 1, this.time.milliseconds());
        this.stateManager.takeSnapshot();
        new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time).truncateAndReload(0L, 2L, 70000L);
        Assertions.assertThrows(TransactionCoordinatorFencedException.class, () -> {
            appendEndTxnMarker(this.stateManager, 1L, (short) 0, ControlRecordType.COMMIT, 100L);
        });
    }

    @Test
    public void testLoadFromEmptySnapshotFile() throws IOException {
        testLoadFromCorruptSnapshot(fileChannel -> {
            try {
                fileChannel.truncate(0L);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testLoadFromTruncatedSnapshotFile() throws IOException {
        testLoadFromCorruptSnapshot(fileChannel -> {
            try {
                Assertions.assertTrue(fileChannel.size() > 2);
                fileChannel.truncate(fileChannel.size() / 2);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testLoadFromCorruptSnapshotFile() throws IOException {
        testLoadFromCorruptSnapshot(fileChannel -> {
            try {
                Assertions.assertTrue(fileChannel.size() > 2);
                fileChannel.write(ByteBuffer.wrap(new byte[37]));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testAppendEmptyControlBatch() {
        RecordBatch recordBatch = (RecordBatch) Mockito.mock(RecordBatch.class);
        Mockito.when(Boolean.valueOf(recordBatch.isControlBatch())).thenReturn(true);
        Mockito.when(recordBatch.iterator()).thenReturn(Collections.emptyIterator());
        append(this.stateManager, 23423L, 15, recordBatch);
        Assertions.assertEquals(OptionalLong.empty(), getLastEntryOrElseThrownByProducerId(this.stateManager, 23423L).currentTxnFirstOffset());
    }

    @Test
    public void testRemoveStraySnapshotsKeepCleanShutdownSnapshot() throws IOException {
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 5L).toPath(), new FileAttribute[0]);
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 2L).toPath(), new FileAttribute[0]);
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 42L).toPath(), new FileAttribute[0]);
        this.stateManager.removeStraySnapshots(Collections.singletonList(5L));
        Assertions.assertEquals(OptionalLong.of(42L), this.stateManager.latestSnapshotOffset());
        Assertions.assertEquals(OptionalLong.of(5L), this.stateManager.oldestSnapshotOffset());
        Assertions.assertEquals(Arrays.asList(5L, 42L), ProducerStateManager.listSnapshotFiles(this.logDir).stream().map(snapshotFile -> {
            return Long.valueOf(snapshotFile.offset);
        }).sorted().collect(Collectors.toList()));
    }

    @Test
    public void testRemoveAllStraySnapshots() throws IOException {
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 5L).toPath(), new FileAttribute[0]);
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 2L).toPath(), new FileAttribute[0]);
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 42L).toPath(), new FileAttribute[0]);
        this.stateManager.removeStraySnapshots(Collections.singletonList(42L));
        Assertions.assertEquals(Collections.singletonList(42L), ProducerStateManager.listSnapshotFiles(this.logDir).stream().map(snapshotFile -> {
            return Long.valueOf(snapshotFile.offset);
        }).sorted().collect(Collectors.toList()));
    }

    @Test
    public void testRemoveAndMarkSnapshotForDeletion() throws IOException {
        Files.createFile(LogFileUtils.producerSnapshotFile(this.logDir, 5L).toPath(), new FileAttribute[0]);
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        Assertions.assertTrue(producerStateManager.latestSnapshotOffset().isPresent());
        Optional removeAndMarkSnapshotForDeletion = producerStateManager.removeAndMarkSnapshotForDeletion(5L);
        Assertions.assertTrue(removeAndMarkSnapshotForDeletion.isPresent());
        Assertions.assertTrue(((SnapshotFile) removeAndMarkSnapshotForDeletion.get()).file().toPath().toString().endsWith(".deleted"));
        Assertions.assertFalse(producerStateManager.latestSnapshotOffset().isPresent());
    }

    @Test
    public void testRemoveAndMarkSnapshotForDeletionAlreadyDeleted() throws IOException {
        File producerSnapshotFile = LogFileUtils.producerSnapshotFile(this.logDir, 5L);
        Files.createFile(producerSnapshotFile.toPath(), new FileAttribute[0]);
        ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
        Assertions.assertTrue(producerStateManager.latestSnapshotOffset().isPresent());
        Files.delete(producerSnapshotFile.toPath());
        Assertions.assertFalse(producerStateManager.removeAndMarkSnapshotForDeletion(5L).isPresent());
        Assertions.assertFalse(producerStateManager.latestSnapshotOffset().isPresent());
    }

    @Test
    public void testEntryForVerification() {
        VerificationGuard verificationGuard = this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0).verificationGuard();
        VerificationStateEntry maybeCreateVerificationStateEntry = this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0);
        VerificationStateEntry verificationStateEntry = this.stateManager.verificationStateEntry(1L);
        Assertions.assertEquals(verificationGuard, verificationStateEntry.verificationGuard());
        Assertions.assertEquals(verificationStateEntry.verificationGuard(), maybeCreateVerificationStateEntry.verificationGuard());
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, true);
        this.stateManager.clearVerificationStateEntry(1L);
        Assertions.assertNull(this.stateManager.verificationStateEntry(1L));
    }

    @Test
    public void testSequenceAndEpochInVerificationEntry() {
        VerificationStateEntry maybeCreateVerificationStateEntry = this.stateManager.maybeCreateVerificationStateEntry(1L, 1, (short) 0);
        VerificationGuard verificationGuard = maybeCreateVerificationStateEntry.verificationGuard();
        verifyEntry(verificationGuard, maybeCreateVerificationStateEntry, 1, (short) 0);
        verifyEntry(verificationGuard, this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0), 0, (short) 0);
        verifyEntry(verificationGuard, this.stateManager.maybeCreateVerificationStateEntry(1L, 2, (short) 1), 2, (short) 1);
        verifyEntry(verificationGuard, this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0), 2, (short) 1);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testThrowOutOfOrderSequenceWithVerificationSequenceCheck(boolean z) {
        VerificationStateEntry maybeCreateVerificationStateEntry = this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0);
        if (z) {
            this.producerStateManagerConfig.setTransactionVerificationEnabled(false);
        }
        Assertions.assertThrows(OutOfOrderSequenceException.class, () -> {
            appendClientEntry(this.stateManager, 1L, (short) 0, 4, 0L, true);
        });
        Assertions.assertEquals(maybeCreateVerificationStateEntry, this.stateManager.verificationStateEntry(1L));
    }

    @Test
    public void testVerificationStateEntryExpiration() {
        VerificationStateEntry maybeCreateVerificationStateEntry = this.stateManager.maybeCreateVerificationStateEntry(1L, 0, (short) 0);
        this.time.sleep(this.producerStateManagerConfig.producerIdExpirationMs() / 2);
        this.stateManager.removeExpiredProducers(this.time.milliseconds());
        Assertions.assertEquals(maybeCreateVerificationStateEntry, this.stateManager.verificationStateEntry(1L));
        this.time.sleep((this.producerStateManagerConfig.producerIdExpirationMs() / 2) + 1);
        this.stateManager.removeExpiredProducers(this.time.milliseconds());
        Assertions.assertNull(this.stateManager.verificationStateEntry(1L));
    }

    @Test
    public void testLastStableOffsetCompletedTxn() {
        beginTransaction(1L, 992342L, 990000L);
        long j = 992342 + 25;
        beginTransaction(2L, j, 990000L);
        long j2 = 992342 + 57;
        beginTransaction(3L, j2, 990000L);
        long j3 = j2 + 15;
        CompletedTxn completedTxn = new CompletedTxn(1L, 992342L, j3, false);
        Assertions.assertEquals(j, this.stateManager.lastStableOffset(completedTxn));
        this.stateManager.completeTxn(completedTxn);
        this.stateManager.onHighWatermarkUpdated(j3 + 1);
        Assertions.assertEquals(Optional.of(Long.valueOf(j)), this.stateManager.firstUnstableOffset().map(logOffsetMetadata -> {
            return Long.valueOf(logOffsetMetadata.messageOffset);
        }));
        long j4 = j3 + 20;
        CompletedTxn completedTxn2 = new CompletedTxn(3L, j2, j4, false);
        Assertions.assertEquals(j, this.stateManager.lastStableOffset(completedTxn2));
        this.stateManager.completeTxn(completedTxn2);
        this.stateManager.onHighWatermarkUpdated(j4 + 1);
        Assertions.assertEquals(Optional.of(Long.valueOf(j)), this.stateManager.firstUnstableOffset().map(logOffsetMetadata2 -> {
            return Long.valueOf(logOffsetMetadata2.messageOffset);
        }));
        long j5 = j4 + 78;
        CompletedTxn completedTxn3 = new CompletedTxn(2L, j, j5, false);
        Assertions.assertEquals(j5 + 1, this.stateManager.lastStableOffset(completedTxn3));
        this.stateManager.completeTxn(completedTxn3);
        this.stateManager.onHighWatermarkUpdated(j5 + 1);
        Assertions.assertEquals(Optional.empty(), this.stateManager.firstUnstableOffset());
    }

    @Test
    public void testSkipEmptyTransactions() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ProducerAppendInfo prepareUpdate = this.stateManager.prepareUpdate(1L, AppendOrigin.CLIENT);
        appendData(16L, 20L, prepareUpdate, atomicInteger);
        assertTxnMetadataEquals(new TxnMetadata(1L, 16L), (TxnMetadata) prepareUpdate.startedTransactions().get(0));
        this.stateManager.update(prepareUpdate);
        this.stateManager.onHighWatermarkUpdated(21L);
        Assertions.assertEquals(Optional.of(new LogOffsetMetadata(16L)), this.stateManager.firstUnstableOffset());
        ProducerAppendInfo prepareUpdate2 = this.stateManager.prepareUpdate(1L, AppendOrigin.CLIENT);
        Optional<CompletedTxn> appendEndTransaction = appendEndTransaction(ControlRecordType.COMMIT, 21L, prepareUpdate2);
        Assertions.assertTrue(appendEndTransaction.isPresent());
        Assertions.assertEquals(Optional.of(new CompletedTxn(1L, 16L, 21L, false)), appendEndTransaction);
        Assertions.assertFalse(appendEndTransaction(ControlRecordType.COMMIT, 22L, prepareUpdate2).isPresent());
        Assertions.assertFalse(appendEndTransaction(ControlRecordType.ABORT, 23L, prepareUpdate2).isPresent());
        appendData(24L, 27L, prepareUpdate2, atomicInteger);
        Optional<CompletedTxn> appendEndTransaction2 = appendEndTransaction(ControlRecordType.ABORT, 28L, prepareUpdate2);
        Assertions.assertTrue(appendEndTransaction2.isPresent());
        Assertions.assertFalse(appendEndTransaction(ControlRecordType.ABORT, 29L, prepareUpdate2).isPresent());
        appendData(30L, 31L, prepareUpdate2, atomicInteger);
        int size = prepareUpdate2.startedTransactions().size();
        Assertions.assertEquals(2, size);
        assertTxnMetadataEquals(new TxnMetadata(1L, new LogOffsetMetadata(24L)), (TxnMetadata) prepareUpdate2.startedTransactions().get(0));
        assertTxnMetadataEquals(new TxnMetadata(1L, new LogOffsetMetadata(30L)), (TxnMetadata) prepareUpdate2.startedTransactions().get(size - 1));
        this.stateManager.update(prepareUpdate2);
        this.stateManager.completeTxn(appendEndTransaction.get());
        this.stateManager.completeTxn(appendEndTransaction2.get());
        this.stateManager.onHighWatermarkUpdated(32L);
        Assertions.assertEquals(Optional.of(new LogOffsetMetadata(30L)), this.stateManager.firstUnstableOffset());
    }

    @Test
    public void testReadWriteSnapshot() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(1L, new ProducerStateEntry(1L, (short) 2, 3, -1L, OptionalLong.of(100L), Optional.of(new BatchMetadata(1, 2L, 3, -1L))));
        hashMap.put(11L, new ProducerStateEntry(11L, (short) 12, 13, 123456L, OptionalLong.empty(), Optional.empty()));
        File file = new File(this.logDir, "testReadWriteSnapshot");
        ProducerStateManager.writeSnapshot(file, hashMap, true);
        assertEntries(hashMap, ProducerStateManager.readSnapshot(file));
    }

    private void appendEntry(ProducerStateManager producerStateManager, long j, short s, int i, long j2, long j3, AppendOrigin appendOrigin, boolean z) {
        ProducerAppendInfo prepareUpdate = producerStateManager.prepareUpdate(j, appendOrigin);
        prepareUpdate.appendDataBatch(s, i, i, j3, new LogOffsetMetadata(j2), j2, z);
        producerStateManager.update(prepareUpdate);
        producerStateManager.updateMapEndOffset(j2 + 1);
    }

    private void appendClientEntry(ProducerStateManager producerStateManager, long j, short s, int i, long j2, long j3, boolean z) {
        appendEntry(producerStateManager, j, s, i, j2, j3, AppendOrigin.CLIENT, z);
    }

    private void appendClientEntry(ProducerStateManager producerStateManager, long j, short s, int i, long j2, boolean z) {
        appendClientEntry(producerStateManager, j, s, i, j2, this.time.milliseconds(), z);
    }

    private void appendReplicationEntry(ProducerStateManager producerStateManager, short s, int i, long j, long j2) {
        appendEntry(producerStateManager, 1L, s, i, j, j2, AppendOrigin.REPLICATION, false);
    }

    private void appendReplicationEntry(ProducerStateManager producerStateManager, short s, int i, long j) {
        appendReplicationEntry(producerStateManager, s, i, j, this.time.milliseconds());
    }

    private void appendEndTxnMarker(ProducerStateManager producerStateManager, long j, short s, ControlRecordType controlRecordType, long j2, int i, long j3) {
        ProducerAppendInfo prepareUpdate = producerStateManager.prepareUpdate(j, AppendOrigin.COORDINATOR);
        Optional appendEndTxnMarker = prepareUpdate.appendEndTxnMarker(new EndTransactionMarker(controlRecordType, i), s, j2, j3);
        producerStateManager.update(prepareUpdate);
        producerStateManager.getClass();
        appendEndTxnMarker.ifPresent(producerStateManager::completeTxn);
        producerStateManager.updateMapEndOffset(j2 + 1);
    }

    private void appendEndTxnMarker(ProducerStateManager producerStateManager, long j, short s, ControlRecordType controlRecordType, long j2) {
        appendEndTxnMarker(producerStateManager, j, s, controlRecordType, j2, 0, this.time.milliseconds());
    }

    private ProducerStateEntry getLastEntryOrElseThrownByProducerId(ProducerStateManager producerStateManager, long j) {
        return (ProducerStateEntry) producerStateManager.lastEntry(j).orElseThrow(() -> {
            return new RuntimeException("This producerId:" + j + " should have last entry");
        });
    }

    private void verifyLastEntryWithTxnData(ProducerStateEntry producerStateEntry, int i, long j, long j2, long j3, OptionalLong optionalLong, ProducerAppendInfo producerAppendInfo) {
        Assertions.assertEquals((short) 0, producerStateEntry.producerEpoch());
        Assertions.assertEquals(i, producerStateEntry.firstSeq());
        Assertions.assertEquals(j, producerStateEntry.lastSeq());
        Assertions.assertEquals(j2, producerStateEntry.firstDataOffset());
        Assertions.assertEquals(j3, producerStateEntry.lastDataOffset());
        Assertions.assertEquals(optionalLong, producerStateEntry.currentTxnFirstOffset());
        assertTxnMetadataEquals(Collections.singletonList(new TxnMetadata(1L, 16L)), producerAppendInfo.startedTransactions());
    }

    private void assertTxnMetadataEquals(List<TxnMetadata> list, List<TxnMetadata> list2) {
        Assertions.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            assertTxnMetadataEquals(list.get(i), list2.get(i));
        }
    }

    private void assertTxnMetadataEquals(TxnMetadata txnMetadata, TxnMetadata txnMetadata2) {
        Assertions.assertEquals(txnMetadata.producerId, txnMetadata2.producerId);
        Assertions.assertEquals(txnMetadata.firstOffset, txnMetadata2.firstOffset);
        Assertions.assertEquals(txnMetadata.lastOffset, txnMetadata2.lastOffset);
    }

    private Set<Long> currentSnapshotOffsets() {
        return (Set) Arrays.stream((Object[]) Objects.requireNonNull(this.logDir.listFiles())).map(LogFileUtils::offsetFromFile).collect(Collectors.toSet());
    }

    private void testLoadFromCorruptSnapshot(Consumer<FileChannel> consumer) throws IOException {
        appendClientEntry(this.stateManager, 1L, (short) 0, 0, 0L, false);
        this.stateManager.takeSnapshot();
        appendClientEntry(this.stateManager, 1L, (short) 0, 1, 1L, false);
        this.stateManager.takeSnapshot();
        OptionalLong latestSnapshotOffset = this.stateManager.latestSnapshotOffset();
        Assertions.assertTrue(latestSnapshotOffset.isPresent());
        long asLong = latestSnapshotOffset.getAsLong();
        Assertions.assertEquals(2L, asLong);
        File producerSnapshotFile = LogFileUtils.producerSnapshotFile(this.logDir, asLong);
        FileChannel open = FileChannel.open(producerSnapshotFile.toPath(), StandardOpenOption.WRITE);
        Throwable th = null;
        try {
            try {
                consumer.accept(open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                ProducerStateManager producerStateManager = new ProducerStateManager(this.partition, this.logDir, 300000, this.producerStateManagerConfig, this.time);
                producerStateManager.truncateAndReload(0L, 20L, this.time.milliseconds());
                Assertions.assertFalse(producerSnapshotFile.exists());
                ProducerStateEntry producerStateEntry = (ProducerStateEntry) producerStateManager.activeProducers().get(1L);
                Assertions.assertNotNull(producerStateEntry);
                Assertions.assertEquals(0L, producerStateEntry.lastDataOffset());
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    private void append(ProducerStateManager producerStateManager, long j, int i, RecordBatch recordBatch) {
        ProducerAppendInfo prepareUpdate = producerStateManager.prepareUpdate(j, AppendOrigin.CLIENT);
        prepareUpdate.append(recordBatch, Optional.empty());
        producerStateManager.update(prepareUpdate);
        producerStateManager.updateMapEndOffset(i + 1);
    }

    private void verifyEntry(VerificationGuard verificationGuard, VerificationStateEntry verificationStateEntry, int i, short s) {
        VerificationStateEntry verificationStateEntry2 = this.stateManager.verificationStateEntry(1L);
        Assertions.assertEquals(verificationGuard, verificationStateEntry2.verificationGuard());
        Assertions.assertEquals(verificationStateEntry2.verificationGuard(), verificationStateEntry.verificationGuard());
        Assertions.assertEquals(i, verificationStateEntry2.lowestSequence());
        Assertions.assertEquals(s, verificationStateEntry2.epoch());
    }

    private void beginTransaction(long j, long j2, long j3) {
        ProducerAppendInfo producerAppendInfo = new ProducerAppendInfo(this.partition, j, ProducerStateEntry.empty(j), AppendOrigin.CLIENT, this.stateManager.maybeCreateVerificationStateEntry(j, 0, (short) 0));
        producerAppendInfo.appendDataBatch((short) 0, 0, 0, this.time.milliseconds(), new LogOffsetMetadata(j2, j3, 50 * ((int) (j2 - j3))), j2, true);
        this.stateManager.update(producerAppendInfo);
    }

    private Optional<CompletedTxn> appendEndTransaction(ControlRecordType controlRecordType, long j, ProducerAppendInfo producerAppendInfo) {
        return producerAppendInfo.appendEndTxnMarker(new EndTransactionMarker(controlRecordType, 27), (short) 0, j, this.time.milliseconds());
    }

    private void appendData(long j, long j2, ProducerAppendInfo producerAppendInfo, AtomicInteger atomicInteger) {
        producerAppendInfo.appendDataBatch((short) 0, atomicInteger.get(), atomicInteger.addAndGet((int) (j2 - j)), this.time.milliseconds(), new LogOffsetMetadata(j), j2, true);
        atomicInteger.incrementAndGet();
    }

    private void assertEntries(Map<Long, ProducerStateEntry> map, List<ProducerStateEntry> list) {
        Map map2 = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.producerId();
        }, producerStateEntry -> {
            return producerStateEntry;
        }));
        Assertions.assertEquals(map.keySet(), map2.keySet());
        map.forEach((l, producerStateEntry2) -> {
            assertProducerStateEntry(producerStateEntry2, (ProducerStateEntry) map2.get(l));
        });
    }

    private void assertProducerStateEntry(ProducerStateEntry producerStateEntry, ProducerStateEntry producerStateEntry2) {
        Assertions.assertEquals(producerStateEntry.producerId(), producerStateEntry2.producerId());
        Assertions.assertEquals(producerStateEntry.producerEpoch(), producerStateEntry2.producerEpoch());
        Assertions.assertEquals(producerStateEntry.coordinatorEpoch(), producerStateEntry2.coordinatorEpoch());
        Assertions.assertEquals(producerStateEntry.lastTimestamp(), producerStateEntry2.lastTimestamp());
        Assertions.assertEquals(producerStateEntry.currentTxnFirstOffset(), producerStateEntry2.currentTxnFirstOffset());
        Assertions.assertIterableEquals(producerStateEntry.batchMetadata(), producerStateEntry2.batchMetadata());
    }
}
