package org.apache.kafka.storage.internals.log;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataSerdeTest;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/storage/internals/log/LocalLogTest.class */
public class LocalLogTest {
    private static final MockTime MOCK_TIME = new MockTime();
    private final File tmpDir = TestUtils.tempDirectory();
    private final File logDir = TestUtils.randomPartitionLogDir(this.tmpDir);
    private final TopicPartition topicPartition = new TopicPartition("test_topic", 1);
    private final LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
    private LocalLog log;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/storage/internals/log/LocalLogTest$KeyValue.class */
    public static final class KeyValue extends Record {
        private final String key;
        private final String value;

        KeyValue(String str, String str2) {
            this.key = str;
            this.value = str2;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SimpleRecord toRecord(long j) {
            return new SimpleRecord(j, this.key.getBytes(), this.value.getBytes());
        }

        SimpleRecord toRecord() {
            return new SimpleRecord(LocalLogTest.MOCK_TIME.milliseconds(), this.key.getBytes(), this.value.getBytes());
        }

        static KeyValue fromRecord(Record record) {
            return new KeyValue(record.hasKey() ? StandardCharsets.UTF_8.decode(record.key()).toString() : "", record.hasValue() ? StandardCharsets.UTF_8.decode(record.value()).toString() : "");
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KeyValue.class), KeyValue.class, "key;value", "FIELD:Lorg/apache/kafka/storage/internals/log/LocalLogTest$KeyValue;->key:Ljava/lang/String;", "FIELD:Lorg/apache/kafka/storage/internals/log/LocalLogTest$KeyValue;->value:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KeyValue.class), KeyValue.class, "key;value", "FIELD:Lorg/apache/kafka/storage/internals/log/LocalLogTest$KeyValue;->key:Ljava/lang/String;", "FIELD:Lorg/apache/kafka/storage/internals/log/LocalLogTest$KeyValue;->value:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KeyValue.class, Object.class), KeyValue.class, "key;value", "FIELD:Lorg/apache/kafka/storage/internals/log/LocalLogTest$KeyValue;->key:Ljava/lang/String;", "FIELD:Lorg/apache/kafka/storage/internals/log/LocalLogTest$KeyValue;->value:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String key() {
            return this.key;
        }

        public String value() {
            return this.value;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/storage/internals/log/LocalLogTest$TestDeletionReason.class */
    public static class TestDeletionReason implements SegmentDeletionReason {
        private Collection<LogSegment> deletedSegments = new ArrayList();

        TestDeletionReason() {
        }

        public void logReason(List<LogSegment> list) {
            this.deletedSegments = new ArrayList(list);
        }

        Collection<LogSegment> deletedSegments() {
            return this.deletedSegments;
        }
    }

    LocalLogTest() {
    }

    @BeforeEach
    public void setUp() throws IOException {
        this.log = createLocalLogWithActiveSegment(this.logDir, new LogConfig(new Properties()));
    }

    @AfterEach
    public void tearDown() throws IOException {
        try {
            this.log.close();
        } catch (KafkaStorageException e) {
        }
    }

    private List<SimpleRecord> kvsToRecords(List<KeyValue> list) {
        return (List) list.stream().map((v0) -> {
            return v0.toRecord();
        }).collect(Collectors.toList());
    }

    private List<KeyValue> recordsToKvs(Iterable<Record> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<Record> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(KeyValue.fromRecord(it.next()));
        }
        return arrayList;
    }

    private void appendRecords(List<SimpleRecord> list, long j) throws IOException {
        this.log.append((j + list.size()) - 1, list.get(0).timestamp(), j, MemoryRecords.withRecords(j, Compression.NONE, 0, (SimpleRecord[]) list.toArray(new SimpleRecord[0])));
    }

    private FetchDataInfo readRecords(long j) throws IOException {
        return readRecords(j, this.log.segments().activeSegment().size(), this.log.logEndOffsetMetadata());
    }

    private FetchDataInfo readRecords(int i) throws IOException {
        return readRecords(0L, i, this.log.logEndOffsetMetadata());
    }

    private FetchDataInfo readRecords(long j, LogOffsetMetadata logOffsetMetadata) throws IOException {
        return readRecords(j, this.log.segments().activeSegment().size(), logOffsetMetadata);
    }

    private FetchDataInfo readRecords(long j, int i, LogOffsetMetadata logOffsetMetadata) throws IOException {
        return this.log.read(j, i, false, logOffsetMetadata, false);
    }

    @Test
    public void testLogDeleteSegmentsSuccess() throws IOException {
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 0L);
        this.log.roll(0L);
        Assertions.assertEquals(2, this.log.segments().numberOfSegments());
        Assertions.assertNotEquals(0, this.logDir.listFiles().length);
        ArrayList arrayList = new ArrayList(this.log.segments().values());
        List deleteAllSegments = this.log.deleteAllSegments();
        Assertions.assertTrue(this.log.segments().isEmpty());
        Assertions.assertEquals(arrayList, deleteAllSegments);
        Assertions.assertThrows(KafkaStorageException.class, () -> {
            this.log.checkIfMemoryMappedBufferClosed();
        });
        Assertions.assertTrue(this.logDir.exists());
    }

    @Test
    public void testRollEmptyActiveSegment() {
        LogSegment activeSegment = this.log.segments().activeSegment();
        this.log.roll(0L);
        Assertions.assertEquals(1, this.log.segments().numberOfSegments());
        Assertions.assertNotEquals(activeSegment, this.log.segments().activeSegment());
        Assertions.assertNotEquals(0, this.logDir.listFiles().length);
        Assertions.assertTrue(activeSegment.hasSuffix(".deleted"));
    }

    @Test
    public void testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty() throws IOException {
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 0L);
        this.log.roll(0L);
        Assertions.assertEquals(2, this.log.segments().numberOfSegments());
        Assertions.assertNotEquals(0, this.logDir.listFiles().length);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.log.deleteEmptyDir();
        });
        Assertions.assertTrue(this.logDir.exists());
        this.log.deleteAllSegments();
        this.log.deleteEmptyDir();
        Assertions.assertFalse(this.logDir.exists());
    }

    @Test
    public void testUpdateConfig() {
        LogConfig config = this.log.config();
        Assertions.assertEquals(config, this.log.config());
        Properties properties = new Properties();
        properties.put("segment.bytes", Integer.valueOf(config.segmentSize + 1));
        LogConfig logConfig = new LogConfig(properties);
        this.log.updateConfig(logConfig);
        Assertions.assertEquals(logConfig, this.log.config());
    }

    @Test
    public void testLogDirRenameToNewDir() throws IOException {
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 0L);
        this.log.roll(0L);
        Assertions.assertEquals(2, this.log.segments().numberOfSegments());
        File randomPartitionLogDir = TestUtils.randomPartitionLogDir(this.tmpDir);
        Assertions.assertTrue(this.log.renameDir(randomPartitionLogDir.getName()));
        Assertions.assertFalse(this.logDir.exists());
        Assertions.assertTrue(randomPartitionLogDir.exists());
        Assertions.assertEquals(randomPartitionLogDir, this.log.dir());
        Assertions.assertEquals(randomPartitionLogDir.getParent(), this.log.parentDir());
        Assertions.assertEquals(randomPartitionLogDir.getParent(), this.log.dir().getParent());
        this.log.segments().values().forEach(logSegment -> {
            Assertions.assertEquals(randomPartitionLogDir.getPath(), logSegment.log().file().getParentFile().getPath());
        });
        Assertions.assertEquals(2, this.log.segments().numberOfSegments());
    }

    @Test
    public void testLogDirRenameToExistingDir() {
        Assertions.assertFalse(this.log.renameDir(this.log.dir().getName()));
    }

    @Test
    public void testLogFlush() throws IOException {
        Assertions.assertEquals(0L, this.log.recoveryPoint());
        Assertions.assertEquals(MOCK_TIME.milliseconds(), this.log.lastFlushTime());
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 0L);
        MOCK_TIME.sleep(1L);
        LogSegment roll = this.log.roll(0L);
        this.log.flush(roll.baseOffset());
        this.log.markFlushed(roll.baseOffset());
        Assertions.assertEquals(1L, this.log.recoveryPoint());
        Assertions.assertEquals(MOCK_TIME.milliseconds(), this.log.lastFlushTime());
    }

    @Test
    public void testLogAppend() throws IOException {
        Assertions.assertFalse(readRecords(1).records.records().iterator().hasNext());
        MOCK_TIME.sleep(1L);
        List<KeyValue> of = List.of(new KeyValue("abc", "ABC"), new KeyValue("de", "DE"));
        appendRecords(kvsToRecords(of), 0L);
        Assertions.assertEquals(2L, this.log.logEndOffset());
        Assertions.assertEquals(0L, this.log.recoveryPoint());
        FetchDataInfo readRecords = readRecords(0L);
        Assertions.assertEquals(2L, Utils.toList(readRecords.records.records()).size());
        Assertions.assertEquals(of, recordsToKvs(readRecords.records.records()));
    }

    @Test
    public void testLogCloseSuccess() throws IOException {
        List<KeyValue> of = List.of(new KeyValue("abc", "ABC"), new KeyValue("de", "DE"));
        appendRecords(kvsToRecords(of), 0L);
        this.log.close();
        Assertions.assertThrows(ClosedChannelException.class, () -> {
            appendRecords(kvsToRecords(of), 2L);
        });
    }

    @Test
    public void testLogCloseIdempotent() {
        this.log.close();
        this.log.close();
    }

    @Test
    public void testLogCloseFailureWhenInMemoryBufferClosed() throws IOException {
        appendRecords(kvsToRecords(List.of(new KeyValue("abc", "ABC"), new KeyValue("de", "DE"))), 0L);
        this.log.closeHandlers();
        Assertions.assertThrows(KafkaStorageException.class, () -> {
            this.log.close();
        });
    }

    @Test
    public void testLogCloseHandlers() throws IOException {
        List<KeyValue> of = List.of(new KeyValue("abc", "ABC"), new KeyValue("de", "DE"));
        appendRecords(kvsToRecords(of), 0L);
        this.log.closeHandlers();
        Assertions.assertThrows(ClosedChannelException.class, () -> {
            appendRecords(kvsToRecords(of), 2L);
        });
    }

    @Test
    public void testLogCloseHandlersIdempotent() {
        this.log.closeHandlers();
        this.log.closeHandlers();
    }

    private void testRemoveAndDeleteSegments(boolean z) throws IOException {
        for (int i = 0; i < 9; i++) {
            appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), i);
            this.log.roll(0L);
        }
        Assertions.assertEquals(10L, this.log.segments().numberOfSegments());
        TestDeletionReason testDeletionReason = new TestDeletionReason();
        ArrayList arrayList = new ArrayList(this.log.segments().values());
        this.log.removeAndDeleteSegments(arrayList, z, testDeletionReason);
        if (z) {
            MOCK_TIME.sleep(this.log.config().fileDeleteDelayMs + 1);
        }
        Assertions.assertTrue(this.log.segments().isEmpty());
        Assertions.assertEquals(arrayList, testDeletionReason.deletedSegments());
        arrayList.forEach(logSegment -> {
            Assertions.assertTrue(logSegment.deleted());
        });
    }

    @Test
    public void testRemoveAndDeleteSegmentsSync() throws IOException {
        testRemoveAndDeleteSegments(false);
    }

    @Test
    public void testRemoveAndDeleteSegmentsAsync() throws IOException {
        testRemoveAndDeleteSegments(true);
    }

    private void testDeleteSegmentFiles(boolean z) throws IOException {
        for (int i = 0; i < 9; i++) {
            appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), i);
            this.log.roll(0L);
        }
        Assertions.assertEquals(10L, this.log.segments().numberOfSegments());
        Collection values = this.log.segments().values();
        LocalLog.deleteSegmentFiles(values, z, this.log.dir(), this.log.topicPartition(), this.log.config(), this.log.scheduler(), this.log.logDirFailureChannel(), "");
        if (z) {
            values.forEach(logSegment -> {
                Assertions.assertFalse(logSegment.deleted());
                Assertions.assertTrue(logSegment.hasSuffix(".deleted"));
            });
            MOCK_TIME.sleep(this.log.config().fileDeleteDelayMs + 1);
        }
        values.forEach(logSegment2 -> {
            Assertions.assertTrue(logSegment2.deleted());
        });
    }

    @Test
    public void testDeleteSegmentFilesSync() throws IOException {
        testDeleteSegmentFiles(false);
    }

    @Test
    public void testDeleteSegmentFilesAsync() throws IOException {
        testDeleteSegmentFiles(true);
    }

    @Test
    public void testCreateAndDeleteSegment() throws IOException {
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 0L);
        long baseOffset = this.log.segments().activeSegment().baseOffset() + 1;
        LogSegment activeSegment = this.log.segments().activeSegment();
        LogSegment createAndDeleteSegment = this.log.createAndDeleteSegment(baseOffset, this.log.segments().activeSegment(), true, new LogTruncation(this.log.logger()));
        Assertions.assertEquals(1, this.log.segments().numberOfSegments());
        Assertions.assertEquals(createAndDeleteSegment, this.log.segments().activeSegment());
        Assertions.assertNotEquals(activeSegment, this.log.segments().activeSegment());
        Assertions.assertTrue(activeSegment.hasSuffix(".deleted"));
        Assertions.assertEquals(baseOffset, this.log.segments().activeSegment().baseOffset());
        Assertions.assertEquals(0L, this.log.recoveryPoint());
        Assertions.assertEquals(baseOffset, this.log.logEndOffset());
        Assertions.assertFalse(readRecords(baseOffset).records.records().iterator().hasNext());
    }

    @Test
    public void testTruncateFullyAndStartAt() throws IOException {
        SimpleRecord simpleRecord = new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes());
        for (int i = 0; i < 8; i++) {
            appendRecords(List.of(simpleRecord), i);
            if (i % 2 != 0) {
                this.log.roll(0L);
            }
        }
        for (int i2 = 8; i2 < 13; i2++) {
            appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), i2);
        }
        Assertions.assertEquals(5, this.log.segments().numberOfSegments());
        Assertions.assertNotEquals(10L, this.log.segments().activeSegment().baseOffset());
        Assertions.assertEquals(new ArrayList(this.log.segments().values()), this.log.truncateFullyAndStartAt(10L));
        Assertions.assertEquals(1, this.log.segments().numberOfSegments());
        Assertions.assertEquals(10L, this.log.segments().activeSegment().baseOffset());
        Assertions.assertEquals(0L, this.log.recoveryPoint());
        Assertions.assertEquals(10L, this.log.logEndOffset());
        Assertions.assertFalse(readRecords(10L).records.records().iterator().hasNext());
    }

    @Test
    public void testWhenFetchOffsetHigherThanMaxOffset() throws IOException {
        SimpleRecord simpleRecord = new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes());
        for (int i = 0; i < 5; i++) {
            appendRecords(List.of(simpleRecord), i);
            if (i % 2 != 0) {
                this.log.roll(0L);
            }
        }
        Assertions.assertEquals(3, this.log.segments().numberOfSegments());
        FetchDataInfo readRecords = readRecords(3L, new LogOffsetMetadata(4L, 4L, 0));
        Assertions.assertEquals(1, Utils.toList(readRecords.records.records()).size());
        Assertions.assertEquals(new LogOffsetMetadata(3L, 2L, 69), readRecords.fetchOffsetMetadata);
        FetchDataInfo readRecords2 = readRecords(4L, new LogOffsetMetadata(4L, 4L, 0));
        Assertions.assertFalse(readRecords2.records.records().iterator().hasNext());
        Assertions.assertEquals(new LogOffsetMetadata(4L, 4L, 0), readRecords2.fetchOffsetMetadata);
        FetchDataInfo readRecords3 = readRecords(5L, new LogOffsetMetadata(4L, 4L, 0));
        Assertions.assertFalse(readRecords3.records.records().iterator().hasNext());
        Assertions.assertEquals(new LogOffsetMetadata(5L, 4L, 69), readRecords3.fetchOffsetMetadata);
        FetchDataInfo readRecords4 = readRecords(3L, new LogOffsetMetadata(4L, -1L, -1));
        Assertions.assertFalse(readRecords4.records.records().iterator().hasNext());
        Assertions.assertEquals(new LogOffsetMetadata(3L, 2L, 69), readRecords4.fetchOffsetMetadata);
        FetchDataInfo readRecords5 = readRecords(3L, new LogOffsetMetadata(4L, 0L, 40));
        Assertions.assertFalse(readRecords5.records.records().iterator().hasNext());
        Assertions.assertEquals(new LogOffsetMetadata(3L, 2L, 69), readRecords5.fetchOffsetMetadata);
    }

    @Test
    public void testTruncateTo() throws IOException {
        for (int i = 0; i < 12; i++) {
            appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), i);
            if (i % 3 == 2) {
                this.log.roll(0L);
            }
        }
        Assertions.assertEquals(5, this.log.segments().numberOfSegments());
        Assertions.assertEquals(12L, this.log.logEndOffset());
        Assertions.assertEquals(new ArrayList(this.log.segments().values(9L, this.log.logEndOffset() + 1)), this.log.truncateTo(7L));
        Assertions.assertEquals(3, this.log.segments().numberOfSegments());
        Assertions.assertEquals(6L, this.log.segments().activeSegment().baseOffset());
        Assertions.assertEquals(0L, this.log.recoveryPoint());
        Assertions.assertEquals(7L, this.log.logEndOffset());
        FetchDataInfo readRecords = readRecords(6L);
        Assertions.assertEquals(1, Utils.toList(readRecords.records.records()).size());
        Assertions.assertEquals(List.of(new KeyValue("", "a")), recordsToKvs(readRecords.records.records()));
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 7L);
        Assertions.assertEquals(8L, this.log.logEndOffset());
    }

    @Test
    public void testNonActiveSegmentsFrom() throws IOException {
        for (int i = 0; i < 5; i++) {
            appendRecords(kvsToRecords(List.of(new KeyValue(String.valueOf(i), String.valueOf(i)))), i);
            this.log.roll(0L);
        }
        Assertions.assertEquals(5L, this.log.segments().activeSegment().baseOffset());
        Assertions.assertEquals(List.of(0L, 1L, 2L, 3L, 4L), nonActiveBaseOffsetsFrom(0L));
        Assertions.assertEquals(List.of(), nonActiveBaseOffsetsFrom(5L));
        Assertions.assertEquals(List.of(2L, 3L, 4L), nonActiveBaseOffsetsFrom(2L));
        Assertions.assertEquals(List.of(), nonActiveBaseOffsetsFrom(6L));
    }

    private List<Long> nonActiveBaseOffsetsFrom(long j) {
        return (List) this.log.segments().nonActiveLogSegmentsFrom(j).stream().map((v0) -> {
            return v0.baseOffset();
        }).collect(Collectors.toList());
    }

    private String topicPartitionName(String str, String str2) {
        return str + "-" + str2;
    }

    @Test
    public void testParseTopicPartitionName() throws IOException {
        TopicPartition parseTopicPartitionName = LocalLog.parseTopicPartitionName(new File(this.logDir, topicPartitionName("test_topic", "143")));
        Assertions.assertEquals("test_topic", parseTopicPartitionName.topic());
        Assertions.assertEquals(Integer.parseInt("143"), parseTopicPartitionName.partition());
    }

    @Test
    public void testParseTopicPartitionNameWithPeriodForDeletedTopic() throws IOException {
        TopicPartition parseTopicPartitionName = LocalLog.parseTopicPartitionName(new File(this.logDir, LocalLog.logDeleteDirName(new TopicPartition("foo.bar-testtopic", Integer.parseInt("42")))));
        Assertions.assertEquals("foo.bar-testtopic", parseTopicPartitionName.topic(), "Unexpected topic name parsed");
        Assertions.assertEquals(Integer.parseInt("42"), parseTopicPartitionName.partition(), "Unexpected partition number parsed");
    }

    @Test
    public void testParseTopicPartitionNameForEmptyName() throws IOException {
        File file = new File("");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, "KafkaException should have been thrown for dir: " + file.getCanonicalPath());
    }

    @Test
    public void testParseTopicPartitionNameForNull() {
        File file = null;
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, () -> {
            return "KafkaException should have been thrown for dir: " + String.valueOf(file);
        });
    }

    @Test
    public void testParseTopicPartitionNameForMissingSeparator() throws IOException {
        File file = new File(this.logDir, "test_topic" + "1999");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, "KafkaException should have been thrown for dir: " + file.getCanonicalPath());
        File file2 = new File(this.logDir, "test_topic" + "1999" + ".-delete");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file2);
        }, "KafkaException should have been thrown for dir: " + file2.getCanonicalPath());
    }

    @Test
    public void testParseTopicPartitionNameForMissingTopic() throws IOException {
        File file = new File(this.logDir, topicPartitionName("", "1999"));
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, "KafkaException should have been thrown for dir: " + file.getCanonicalPath());
        File file2 = new File(this.logDir, LocalLog.logDeleteDirName(new TopicPartition("", Integer.parseInt("1999"))));
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file2);
        }, "KafkaException should have been thrown for dir: " + file2.getCanonicalPath());
    }

    @Test
    public void testParseTopicPartitionNameForMissingPartition() throws IOException {
        File file = new File(this.logDir.getPath() + topicPartitionName("test_topic", ""));
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, "KafkaException should have been thrown for dir: " + file.getCanonicalPath());
        File file2 = new File(this.logDir, topicPartitionName("test_topic", "") + ".-delete");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file2);
        }, "KafkaException should have been thrown for dir: " + file2.getCanonicalPath());
    }

    @Test
    public void testParseTopicPartitionNameForInvalidPartition() throws IOException {
        File file = new File(this.logDir, topicPartitionName("test_topic", "1999a"));
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, "KafkaException should have been thrown for dir: " + file.getCanonicalPath());
        File file2 = new File(this.logDir, "test_topic" + "1999a" + ".-delete");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file2);
        }, "KafkaException should have been thrown for dir: " + file2.getCanonicalPath());
    }

    @Test
    public void testParseTopicPartitionNameForExistingInvalidDir() throws IOException {
        File file = new File(this.logDir.getPath() + "/non_kafka_dir");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file);
        }, "KafkaException should have been thrown for dir: " + file.getCanonicalPath());
        File file2 = new File(this.logDir.getPath() + "/non_kafka_dir-delete");
        Assertions.assertThrows(KafkaException.class, () -> {
            LocalLog.parseTopicPartitionName(file2);
        }, "KafkaException should have been thrown for dir: " + file2.getCanonicalPath());
    }

    @Test
    public void testLogDeleteDirName() {
        String logDeleteDirName = LocalLog.logDeleteDirName(new TopicPartition(RemoteLogMetadataSerdeTest.TOPIC, 3));
        Assertions.assertTrue(logDeleteDirName.length() <= 255);
        Assertions.assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(logDeleteDirName).matches());
        Assertions.assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(logDeleteDirName).matches());
        Assertions.assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(logDeleteDirName).matches());
        String logDeleteDirName2 = LocalLog.logDeleteDirName(new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5));
        Assertions.assertEquals(255, logDeleteDirName2.length());
        Assertions.assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(logDeleteDirName2).matches());
        Assertions.assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(logDeleteDirName2).matches());
        Assertions.assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(logDeleteDirName2).matches());
    }

    @Test
    public void testOffsetFromFile() {
        Assertions.assertEquals(23423423L, LogFileUtils.offsetFromFile(LogFileUtils.logFile(this.tmpDir, 23423423L)));
        Assertions.assertEquals(23423423L, LogFileUtils.offsetFromFile(LogFileUtils.offsetIndexFile(this.tmpDir, 23423423L)));
        Assertions.assertEquals(23423423L, LogFileUtils.offsetFromFile(LogFileUtils.timeIndexFile(this.tmpDir, 23423423L)));
    }

    @Test
    public void testRollSegmentThatAlreadyExists() throws IOException {
        Assertions.assertEquals(1, this.log.segments().numberOfSegments(), "Log begins with a single empty segment.");
        this.log.roll(0L);
        Assertions.assertEquals(1, this.log.segments().numberOfSegments(), "Expect 1 segment after roll() empty segment with base offset.");
        List<KeyValue> of = List.of(new KeyValue("k1", "v1"));
        appendRecords(kvsToRecords(of), 0L);
        Assertions.assertEquals(0L, this.log.segments().activeSegment().baseOffset());
        List of2 = List.of(new KeyValue("k2", "v2"));
        appendRecords((List) of2.stream().map(keyValue -> {
            return keyValue.toRecord(MOCK_TIME.milliseconds() + 10);
        }).collect(Collectors.toList()), 1L);
        Assertions.assertEquals(2L, this.log.logEndOffset(), "Expect two records in the log");
        FetchDataInfo readRecords = readRecords(0L);
        Assertions.assertEquals(2L, Utils.toList(readRecords.records.records()).size());
        Assertions.assertEquals(Stream.concat(of.stream(), of2.stream()).collect(Collectors.toList()), recordsToKvs(readRecords.records.records()));
        this.log.roll(0L);
        Assertions.assertEquals(2L, this.log.segments().activeSegment().baseOffset(), "Expect base offset of active segment to be LEO");
        Assertions.assertEquals(2, this.log.segments().numberOfSegments(), "Expect two segments.");
        Assertions.assertEquals(2L, this.log.logEndOffset());
    }

    @Test
    public void testNewSegmentsAfterRoll() throws IOException {
        Assertions.assertEquals(1, this.log.segments().numberOfSegments(), "Log begins with a single empty segment.");
        Assertions.assertEquals(0L, this.log.roll(0L).baseOffset());
        Assertions.assertEquals(1, this.log.segments().numberOfSegments());
        Assertions.assertEquals(0L, this.log.logEndOffset());
        appendRecords(List.of(new KeyValue("k1", "v1").toRecord()), 0L);
        Assertions.assertEquals(1L, this.log.roll(0L).baseOffset());
        Assertions.assertEquals(2, this.log.segments().numberOfSegments());
        Assertions.assertEquals(1L, this.log.logEndOffset());
        appendRecords(List.of(new KeyValue("k2", "v2").toRecord()), 1L);
        Assertions.assertEquals(2L, this.log.roll(1L).baseOffset());
        Assertions.assertEquals(3, this.log.segments().numberOfSegments());
        Assertions.assertEquals(2L, this.log.logEndOffset());
    }

    @Test
    public void testRollSegmentErrorWhenNextOffsetIsIllegal() throws IOException {
        Assertions.assertEquals(1, this.log.segments().numberOfSegments(), "Log begins with a single empty segment.");
        appendRecords(kvsToRecords(List.of(new KeyValue("k1", "v1"), new KeyValue("k2", "v2"), new KeyValue("k3", "v3"))), 0L);
        Assertions.assertEquals(0L, this.log.segments().activeSegment().baseOffset());
        Assertions.assertEquals(3L, this.log.logEndOffset(), "Expect two records in the log");
        this.log.roll(0L);
        Assertions.assertEquals(3L, this.log.segments().activeSegment().baseOffset());
        this.log.updateLogEndOffset(1L);
        Assertions.assertThrows(KafkaException.class, () -> {
            this.log.roll(0L);
        });
    }

    @Test
    public void testFlushingNonExistentDir() throws IOException {
        LocalLog localLog = (LocalLog) Mockito.spy(this.log);
        appendRecords(List.of(new SimpleRecord(MOCK_TIME.milliseconds(), "a".getBytes())), 0L);
        MOCK_TIME.sleep(1L);
        LogSegment roll = this.log.roll(0L);
        ((LocalLog) Mockito.doReturn(new File("__NON_EXISTENT__")).when(localLog)).dir();
        Assertions.assertDoesNotThrow(() -> {
            localLog.flush(roll.baseOffset());
        });
    }

    private LocalLog createLocalLogWithActiveSegment(File file, LogConfig logConfig) throws IOException {
        LogSegments logSegments = new LogSegments(this.topicPartition);
        logSegments.add(LogSegment.open(file, 0L, logConfig, MOCK_TIME, logConfig.initFileSize(), logConfig.preallocate));
        return new LocalLog(file, logConfig, logSegments, 0L, new LogOffsetMetadata(0L, 0L, 0), MOCK_TIME.scheduler, MOCK_TIME, this.topicPartition, this.logDirFailureChannel);
    }
}
