package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.class */
public class ShareFetchBufferTest {
    private final Time time = new MockTime(0, 0, 0);
    private final TopicIdPartition topicAPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
    private final TopicIdPartition topicAPartition1 = new TopicIdPartition(Uuid.randomUuid(), 1, "topic-a");
    private final TopicIdPartition topicAPartition2 = new TopicIdPartition(Uuid.randomUuid(), 2, "topic-a");
    private final Set<TopicIdPartition> allPartitions = partitions(this.topicAPartition0, this.topicAPartition1, this.topicAPartition2);
    private LogContext logContext;
    private ShareFetchMetricsManager shareFetchMetricsManager;

    @BeforeEach
    public void setup() {
        this.logContext = new LogContext();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", StringSerializer.class.getName());
        properties.put("value.deserializer", StringSerializer.class.getName());
        this.shareFetchMetricsManager = ConsumerUtils.createShareFetchMetricsManager(ConsumerUtils.createMetrics(new ConsumerConfig(properties), this.time));
    }

    @Test
    public void testBasicPeekAndPoll() {
        ShareFetchBuffer shareFetchBuffer = new ShareFetchBuffer(this.logContext);
        try {
            ShareCompletedFetch completedFetch = completedFetch(this.topicAPartition0);
            Assertions.assertTrue(shareFetchBuffer.isEmpty());
            shareFetchBuffer.add(completedFetch);
            Assertions.assertTrue(shareFetchBuffer.hasCompletedFetches(shareCompletedFetch -> {
                return true;
            }));
            Assertions.assertFalse(shareFetchBuffer.isEmpty());
            Assertions.assertNotNull(shareFetchBuffer.peek());
            Assertions.assertSame(completedFetch, shareFetchBuffer.peek());
            Assertions.assertSame(completedFetch, shareFetchBuffer.poll());
            Assertions.assertNull(shareFetchBuffer.peek());
            shareFetchBuffer.close();
        } catch (Throwable th) {
            try {
                shareFetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCloseClearsData() {
        ShareFetchBuffer shareFetchBuffer = null;
        try {
            shareFetchBuffer = new ShareFetchBuffer(this.logContext);
            Assertions.assertNull(shareFetchBuffer.nextInLineFetch());
            Assertions.assertTrue(shareFetchBuffer.isEmpty());
            shareFetchBuffer.add(completedFetch(this.topicAPartition0));
            Assertions.assertFalse(shareFetchBuffer.isEmpty());
            shareFetchBuffer.setNextInLineFetch(completedFetch(this.topicAPartition0));
            Assertions.assertNotNull(shareFetchBuffer.nextInLineFetch());
            if (shareFetchBuffer != null) {
                shareFetchBuffer.close();
            }
            Assertions.assertNull(shareFetchBuffer.nextInLineFetch());
            Assertions.assertTrue(shareFetchBuffer.isEmpty());
        } catch (Throwable th) {
            if (shareFetchBuffer != null) {
                shareFetchBuffer.close();
            }
            throw th;
        }
    }

    @Test
    public void testBufferedPartitions() {
        ShareFetchBuffer shareFetchBuffer = new ShareFetchBuffer(this.logContext);
        try {
            shareFetchBuffer.setNextInLineFetch(completedFetch(this.topicAPartition0));
            shareFetchBuffer.add(completedFetch(this.topicAPartition1));
            shareFetchBuffer.add(completedFetch(this.topicAPartition2));
            Assertions.assertEquals(this.allPartitions, shareFetchBuffer.bufferedPartitions());
            shareFetchBuffer.setNextInLineFetch((ShareCompletedFetch) null);
            Assertions.assertEquals(partitions(this.topicAPartition1, this.topicAPartition2), shareFetchBuffer.bufferedPartitions());
            shareFetchBuffer.poll();
            Assertions.assertEquals(partitions(this.topicAPartition2), shareFetchBuffer.bufferedPartitions());
            shareFetchBuffer.poll();
            Assertions.assertEquals(partitions(new TopicIdPartition[0]), shareFetchBuffer.bufferedPartitions());
            shareFetchBuffer.close();
        } catch (Throwable th) {
            try {
                shareFetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testWakeup() throws Exception {
        ShareFetchBuffer shareFetchBuffer = new ShareFetchBuffer(this.logContext);
        try {
            Thread thread = new Thread(() -> {
                shareFetchBuffer.awaitNotEmpty(this.time.timer(Duration.ofMinutes(1L)));
            });
            thread.start();
            shareFetchBuffer.wakeup();
            thread.join(Duration.ofSeconds(30L).toMillis());
            Assertions.assertFalse(thread.isAlive());
            shareFetchBuffer.close();
        } catch (Throwable th) {
            try {
                shareFetchBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private ShareCompletedFetch completedFetch(TopicIdPartition topicIdPartition) {
        ShareFetchMetricsAggregator shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(this.shareFetchMetricsManager, (Set) this.allPartitions.stream().map((v0) -> {
            return v0.topicPartition();
        }).collect(Collectors.toSet()));
        return new ShareCompletedFetch(this.logContext, BufferSupplier.create(), topicIdPartition, new ShareFetchResponseData.PartitionData(), shareFetchMetricsAggregator, ApiKeys.SHARE_FETCH.latestVersion());
    }

    private static Set<TopicIdPartition> partitions(TopicIdPartition... topicIdPartitionArr) {
        return new HashSet(Arrays.asList(topicIdPartitionArr));
    }
}
