package org.apache.kafka.server.log.remote.metadata.storage;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;

@ClusterTestDefaults(brokers = 3)
/* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.class */
public class RemoteLogSegmentLifecycleTest {
    private final int segSize = 1048576;
    private final int brokerId0 = 0;
    private final int brokerId1 = 1;
    private final Uuid topicId = Uuid.randomUuid();
    private final TopicPartition tp = new TopicPartition(RemoteLogMetadataSerdeTest.TOPIC, 0);
    private final TopicIdPartition topicIdPartition = new TopicIdPartition(this.topicId, this.tp);
    private final Time time = Time.SYSTEM;
    private final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = (RemotePartitionMetadataStore) Mockito.spy(new RemotePartitionMetadataStore());
    private final ClusterInstance clusterInstance;

    RemoteLogSegmentLifecycleTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    private RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() {
        return RemoteLogMetadataManagerTestUtils.builder().bootstrapServers(this.clusterInstance.bootstrapServers()).startConsumerThread(true).remotePartitionMetadataStore(() -> {
            return this.spyRemotePartitionMetadataStore;
        }).build();
    }

    @ClusterTest
    public void testRemoteLogSegmentLifeCycle() throws Exception {
        RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager();
        try {
            createTopicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            HashMap hashMap = new HashMap();
            hashMap.put(0, 0L);
            hashMap.put(1, 20L);
            hashMap.put(2, 80L);
            RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(this.topicIdPartition, Uuid.randomUuid());
            RemoteLogSegmentMetadata remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 0, this.time.milliseconds(), 1048576, hashMap);
            createTopicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata).get();
            ((RemotePartitionMetadataStore) Mockito.verify(this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 1, 40L).isPresent());
            for (int i = 0; i <= 2; i++) {
                Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.highestOffsetForEpoch(this.topicIdPartition, i).isPresent());
            }
            RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
            createTopicBasedRemoteLogMetadataManager.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate).get();
            ((RemotePartitionMetadataStore) Mockito.verify(this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadataUpdate);
            RemoteLogSegmentMetadata createWithUpdates = remoteLogSegmentMetadata.createWithUpdates(remoteLogSegmentMetadataUpdate);
            RemoteLogSegmentMetadata upsertSegmentState = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, Collections.singletonMap(2, 101L), 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap hashMap2 = new HashMap();
            hashMap2.put(2, 201L);
            hashMap2.put(3, 240L);
            RemoteLogSegmentMetadata upsertSegmentState2 = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, hashMap2, 201L, 300L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap hashMap3 = new HashMap();
            hashMap3.put(3, 250L);
            hashMap3.put(4, 370L);
            RemoteLogSegmentMetadata upsertSegmentState3 = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, hashMap3, 250L, 400L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap hashMap4 = new HashMap();
            hashMap4.put(new EpochEntry(1, 40L), createWithUpdates);
            hashMap4.put(new EpochEntry(2, 110L), upsertSegmentState);
            hashMap4.put(new EpochEntry(3, 240L), upsertSegmentState2);
            hashMap4.put(new EpochEntry(3, 250L), upsertSegmentState3);
            hashMap4.put(new EpochEntry(4, 375L), upsertSegmentState3);
            hashMap4.put(new EpochEntry(1, 110L), null);
            hashMap4.put(new EpochEntry(4, 401L), null);
            hashMap4.put(new EpochEntry(5, 301L), null);
            for (Map.Entry entry : hashMap4.entrySet()) {
                EpochEntry epochEntry = (EpochEntry) entry.getKey();
                Optional remoteLogSegmentMetadata2 = createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, epochEntry.epoch, epochEntry.startOffset);
                RemoteLogSegmentMetadata remoteLogSegmentMetadata3 = (RemoteLogSegmentMetadata) entry.getValue();
                if (remoteLogSegmentMetadata3 != null) {
                    Assertions.assertEquals(Optional.of(remoteLogSegmentMetadata3), remoteLogSegmentMetadata2);
                } else {
                    Assertions.assertFalse(remoteLogSegmentMetadata2.isPresent());
                }
            }
            createTopicBasedRemoteLogMetadataManager.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(createWithUpdates.remoteLogSegmentId(), this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, 1)).get();
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 10L).isPresent());
            createTopicBasedRemoteLogMetadataManager.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(createWithUpdates.remoteLogSegmentId(), this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, 1)).get();
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 10L).isPresent());
            HashMap hashMap5 = new HashMap();
            hashMap5.put(0, 19L);
            hashMap5.put(1, 79L);
            hashMap5.put(2, 239L);
            hashMap5.put(3, 369L);
            hashMap5.put(4, 400L);
            for (Map.Entry entry2 : hashMap5.entrySet()) {
                Integer num = (Integer) entry2.getKey();
                Assertions.assertEquals(Optional.of((Long) entry2.getValue()), createTopicBasedRemoteLogMetadataManager.highestOffsetForEpoch(this.topicIdPartition, num.intValue()));
            }
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.highestOffsetForEpoch(this.topicIdPartition, 5).isPresent());
            if (createTopicBasedRemoteLogMetadataManager != null) {
                createTopicBasedRemoteLogMetadataManager.close();
            }
        } catch (Throwable th) {
            if (createTopicBasedRemoteLogMetadataManager != null) {
                try {
                    createTopicBasedRemoteLogMetadataManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private RemoteLogSegmentMetadata upsertSegmentState(RemoteLogMetadataManager remoteLogMetadataManager, Map<Integer, Long> map, long j, long j2, RemoteLogSegmentState remoteLogSegmentState) throws RemoteStorageException, ExecutionException, InterruptedException {
        RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(this.topicIdPartition, Uuid.randomUuid());
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, j, j2, -1L, 0, this.time.milliseconds(), 1048576, map);
        remoteLogMetadataManager.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata).get();
        ((RemotePartitionMetadataStore) Mockito.verify(this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
        RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, this.time.milliseconds(), Optional.empty(), remoteLogSegmentState, 1);
        remoteLogMetadataManager.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate).get();
        ((RemotePartitionMetadataStore) Mockito.verify(this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadataUpdate);
        return remoteLogSegmentMetadata.createWithUpdates(remoteLogSegmentMetadataUpdate);
    }

    private void checkListSegments(RemoteLogMetadataManager remoteLogMetadataManager, int i, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        Iterator listRemoteLogSegments = remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, i);
        Assertions.assertTrue(listRemoteLogSegments.hasNext());
        Assertions.assertEquals(remoteLogSegmentMetadata, listRemoteLogSegments.next());
        Iterator listRemoteLogSegments2 = remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition);
        Assertions.assertTrue(listRemoteLogSegments2.hasNext());
        Assertions.assertEquals(remoteLogSegmentMetadata, listRemoteLogSegments2.next());
    }

    @ClusterTest
    public void testCacheSegmentWithCopySegmentStartedState() throws Exception {
        RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager();
        try {
            createTopicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(this.topicIdPartition, Uuid.randomUuid()), 0L, 50L, -1L, 0, this.time.milliseconds(), 1048576, Collections.singletonMap(0, 0L));
            createTopicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata).get();
            ((RemotePartitionMetadataStore) Mockito.verify(this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 0L).isPresent());
            checkListSegments(createTopicBasedRemoteLogMetadataManager, 0, remoteLogSegmentMetadata);
            if (createTopicBasedRemoteLogMetadataManager != null) {
                createTopicBasedRemoteLogMetadataManager.close();
            }
        } catch (Throwable th) {
            if (createTopicBasedRemoteLogMetadataManager != null) {
                try {
                    createTopicBasedRemoteLogMetadataManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testCacheSegmentWithCopySegmentFinishedState() throws Exception {
        RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager();
        try {
            createTopicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata upsertSegmentState = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, Collections.singletonMap(0, 101L), 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            Assertions.assertEquals(Optional.of(upsertSegmentState), createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 150L));
            checkListSegments(createTopicBasedRemoteLogMetadataManager, 0, upsertSegmentState);
            if (createTopicBasedRemoteLogMetadataManager != null) {
                createTopicBasedRemoteLogMetadataManager.close();
            }
        } catch (Throwable th) {
            if (createTopicBasedRemoteLogMetadataManager != null) {
                try {
                    createTopicBasedRemoteLogMetadataManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testCacheSegmentWithDeleteSegmentStartedState() throws Exception {
        RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager();
        try {
            createTopicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata upsertSegmentState = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, Collections.singletonMap(0, 201L), 201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 250L).isPresent());
            checkListSegments(createTopicBasedRemoteLogMetadataManager, 0, upsertSegmentState);
            if (createTopicBasedRemoteLogMetadataManager != null) {
                createTopicBasedRemoteLogMetadataManager.close();
            }
        } catch (Throwable th) {
            if (createTopicBasedRemoteLogMetadataManager != null) {
                try {
                    createTopicBasedRemoteLogMetadataManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testCacheSegmentsWithDeleteSegmentFinishedState() throws Exception {
        RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager();
        try {
            createTopicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata upsertSegmentState = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, Collections.singletonMap(0, 301L), 301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 350L).isPresent());
            RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(upsertSegmentState.remoteLogSegmentId(), this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, 1);
            createTopicBasedRemoteLogMetadataManager.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate).get();
            ((RemotePartitionMetadataStore) Mockito.verify(this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadataUpdate);
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, 0).hasNext());
            Assertions.assertFalse(createTopicBasedRemoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition).hasNext());
            if (createTopicBasedRemoteLogMetadataManager != null) {
                createTopicBasedRemoteLogMetadataManager.close();
            }
        } catch (Throwable th) {
            if (createTopicBasedRemoteLogMetadataManager != null) {
                try {
                    createTopicBasedRemoteLogMetadataManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    public void testCacheListSegments() throws Exception {
        RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager();
        try {
            createTopicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata upsertSegmentState = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, Collections.singletonMap(0, 0L), 0L, 100L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            RemoteLogSegmentMetadata upsertSegmentState2 = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, Collections.singletonMap(0, 101L), 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap hashMap = new HashMap();
            hashMap.put(0, 201L);
            hashMap.put(1, 301L);
            RemoteLogSegmentMetadata upsertSegmentState3 = upsertSegmentState(createTopicBasedRemoteLogMetadataManager, hashMap, 201L, 400L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            List asList = Arrays.asList(upsertSegmentState, upsertSegmentState2, upsertSegmentState3);
            Assertions.assertTrue(TestUtils.sameElementsWithOrder(asList.iterator(), createTopicBasedRemoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, 0)));
            Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(asList.iterator(), createTopicBasedRemoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition)));
            Assertions.assertTrue(TestUtils.sameElementsWithOrder(Collections.singletonList(upsertSegmentState3).iterator(), createTopicBasedRemoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, 1)));
            if (createTopicBasedRemoteLogMetadataManager != null) {
                createTopicBasedRemoteLogMetadataManager.close();
            }
        } catch (Throwable th) {
            if (createTopicBasedRemoteLogMetadataManager != null) {
                try {
                    createTopicBasedRemoteLogMetadataManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
