package org.apache.kafka.server.log.remote.metadata.storage;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Tag("integration")
@ClusterTestDefaults(brokers = 3)
@ExtendWith({ClusterTestExtensions.class})
/* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class */
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
    private final ClusterInstance clusterInstance;
    private final Time time = new SystemTime();

    TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    @ClusterTest
    public void testMultiplePartitionSubscriptions() throws Exception {
        createTopic("leader", Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
        createTopic("follower", Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
        createTopic("no-messages-topic", Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("leader", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("follower", 0));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("no-messages-topic", 0));
        RemotePartitionMetadataStore remotePartitionMetadataStore = (RemotePartitionMetadataStore) Mockito.spy(new RemotePartitionMetadataStore());
        Phaser phaser = new Phaser(2);
        ((RemotePartitionMetadataStore) Mockito.doAnswer(invocationOnMock -> {
            Object callRealMethod = invocationOnMock.callRealMethod();
            phaser.arriveAndDeregister();
            return callRealMethod;
        }).when(remotePartitionMetadataStore)).markInitialized((TopicIdPartition) ArgumentMatchers.any());
        Phaser phaser2 = new Phaser(2);
        ((RemotePartitionMetadataStore) Mockito.doAnswer(invocationOnMock2 -> {
            Object callRealMethod = invocationOnMock2.callRealMethod();
            phaser2.arriveAndDeregister();
            return callRealMethod;
        }).when(remotePartitionMetadataStore)).handleRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) ArgumentMatchers.any());
        TopicBasedRemoteLogMetadataManager build = RemoteLogMetadataManagerTestUtils.builder().bootstrapServers(this.clusterInstance.bootstrapServers()).startConsumerThread(true).remoteLogMetadataTopicPartitioner(num -> {
            return new RemoteLogMetadataTopicPartitioner(num.intValue()) { // from class: org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.1
                public int metadataPartition(TopicIdPartition topicIdPartition4) {
                    return topicIdPartition3.equals(topicIdPartition4) ? 1 : 0;
                }
            };
        }).remotePartitionMetadataStore(() -> {
            return remotePartitionMetadataStore;
        }).build();
        Throwable th = null;
        try {
            try {
                RemoteLogSegmentMetadata remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), 0L, 100L, -1L, 0, this.time.milliseconds(), 1048576, Collections.singletonMap(0, 0L));
                Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                })).getMessage());
                RemoteLogSegmentMetadata remoteLogSegmentMetadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition2, Uuid.randomUuid()), 0L, 100L, -1L, 0, this.time.milliseconds(), 1048576, Collections.singletonMap(0, 0L));
                Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                })).getMessage());
                Assertions.assertThrows(RemoteStorageException.class, () -> {
                    build.listRemoteLogSegments(topicIdPartition);
                });
                Assertions.assertThrows(RemoteStorageException.class, () -> {
                    build.listRemoteLogSegments(topicIdPartition2);
                });
                build.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());
                phaser.awaitAdvanceInterruptibly(phaser.arrive(), 30000L, TimeUnit.MILLISECONDS);
                phaser2.awaitAdvanceInterruptibly(phaser2.arrive(), 30000L, TimeUnit.MILLISECONDS);
                ((RemotePartitionMetadataStore) Mockito.verify(remotePartitionMetadataStore)).markInitialized(topicIdPartition);
                ((RemotePartitionMetadataStore) Mockito.verify(remotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
                Mockito.clearInvocations(new RemotePartitionMetadataStore[]{remotePartitionMetadataStore});
                Assertions.assertTrue(build.listRemoteLogSegments(topicIdPartition).hasNext());
                Assertions.assertThrows(RemoteStorageException.class, () -> {
                    build.listRemoteLogSegments(topicIdPartition2);
                });
                phaser.bulkRegister(2);
                phaser2.register();
                build.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition3), Collections.singleton(topicIdPartition2));
                phaser.awaitAdvanceInterruptibly(phaser.arrive(), 30000L, TimeUnit.MILLISECONDS);
                phaser2.awaitAdvanceInterruptibly(phaser2.arrive(), 30000L, TimeUnit.MILLISECONDS);
                ((RemotePartitionMetadataStore) Mockito.verify(remotePartitionMetadataStore)).markInitialized(topicIdPartition2);
                ((RemotePartitionMetadataStore) Mockito.verify(remotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(remoteLogSegmentMetadata2);
                Assertions.assertTrue(build.listRemoteLogSegments(topicIdPartition).hasNext(), "No segments found");
                Assertions.assertTrue(build.listRemoteLogSegments(topicIdPartition2).hasNext(), "No segments found");
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private void createTopic(String str, Map<Integer, List<Integer>> map) {
        Admin create = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));
        Throwable th = null;
        try {
            try {
                create.createTopics(Collections.singletonList(new NewTopic(str, map)));
                Assertions.assertDoesNotThrow(() -> {
                    this.clusterInstance.waitForTopic(str, map.size());
                });
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }
}
