package kafka.server.share;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import kafka.cluster.Partition;
import kafka.cluster.PartitionListener;
import kafka.log.remote.RemoteLogReaderTest;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.DelayedShareFetchTest;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;

@Timeout(120)
/* loaded from: input_file:kafka/server/share/SharePartitionManagerTest.class */
public class SharePartitionManagerTest {
    private static final int DEFAULT_RECORD_LOCK_DURATION_MS = 30000;
    private static final int MAX_DELIVERY_COUNT = 5;
    private static final short MAX_IN_FLIGHT_MESSAGES = 200;
    private static final short MAX_FETCH_RECORDS = 500;
    private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
    private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
    static final int PARTITION_MAX_BYTES = 40000;
    static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
    private Timer mockTimer;
    private ReplicaManager mockReplicaManager;
    private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1, 2000, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
    private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList());

    /* loaded from: input_file:kafka/server/share/SharePartitionManagerTest$SharePartitionManagerBuilder.class */
    static class SharePartitionManagerBuilder {
        private ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        private Time time = new MockTime();
        private ShareSessionCache cache = new ShareSessionCache(10, 1000);
        private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap();
        private Persister persister = new NoOpShareStatePersister();
        private Timer timer = new MockTimer();
        private Metrics metrics = new Metrics();

        SharePartitionManagerBuilder() {
        }

        private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
            this.replicaManager = replicaManager;
            return this;
        }

        private SharePartitionManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private SharePartitionManagerBuilder withCache(ShareSessionCache shareSessionCache) {
            this.cache = shareSessionCache;
            return this;
        }

        SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionKey, SharePartition> map) {
            this.partitionCacheMap = map;
            return this;
        }

        private SharePartitionManagerBuilder withShareGroupPersister(Persister persister) {
            this.persister = persister;
            return this;
        }

        private SharePartitionManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        private SharePartitionManagerBuilder withMetrics(Metrics metrics) {
            this.metrics = metrics;
            return this;
        }

        public static SharePartitionManagerBuilder builder() {
            return new SharePartitionManagerBuilder();
        }

        public SharePartitionManager build() {
            return new SharePartitionManager(this.replicaManager, this.time, this.cache, this.partitionCacheMap, SharePartitionManagerTest.DEFAULT_RECORD_LOCK_DURATION_MS, this.timer, SharePartitionManagerTest.MAX_DELIVERY_COUNT, SharePartitionManagerTest.MAX_IN_FLIGHT_MESSAGES, SharePartitionManagerTest.MAX_FETCH_RECORDS, this.persister, (GroupConfigManager) Mockito.mock(GroupConfigManager.class), this.metrics);
        }
    }

    @BeforeEach
    public void setUp() {
        this.mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper", new SystemTimer("sharePartitionManagerTestTimer"));
        this.mockReplicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(this.mockReplicaManager.getPartitionOrException((TopicPartition) Mockito.any())).thenReturn(mockPartition());
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.mockTimer.close();
    }

    @Test
    public void testNewContextReturnsFinalContextWithoutRequestData() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).withTime(new MockTime()).build();
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        Uuid randomUuid2 = Uuid.randomUuid();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), PARTITION_MAX_BYTES));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), PARTITION_MAX_BYTES));
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(randomUuid2, 0), false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        Assertions.assertEquals(FinalContext.class, build.newContext("grp", Collections.emptyMap(), Collections.emptyList(), new ShareRequestMetadata(randomUuid2, -1), true).getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextWithRequestData() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).withTime(new MockTime()).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        Uuid randomUuid3 = Uuid.randomUuid();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), PARTITION_MAX_BYTES));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), PARTITION_MAX_BYTES));
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(randomUuid3, 0), false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        Assertions.assertEquals(FinalContext.class, build.newContext("grp", Collections.singletonMap(new TopicIdPartition(randomUuid2, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(randomUuid2, 0)), Collections.emptyList(), new ShareRequestMetadata(randomUuid3, -1), true).getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextError() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).withTime(new MockTime()).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        String str = "grp";
        Uuid randomUuid3 = Uuid.randomUuid();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), PARTITION_MAX_BYTES));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), PARTITION_MAX_BYTES));
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(randomUuid3, 0), false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(randomUuid3, -1);
        Map singletonMap = Collections.singletonMap(new TopicIdPartition(randomUuid2, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(randomUuid2, PARTITION_MAX_BYTES));
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            build.newContext(str, singletonMap, Collections.emptyList(), shareRequestMetadata, true);
        });
    }

    @Test
    public void testNewContext() {
        Time mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        String str = "grp";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        newContext.shareFetchData().forEach((topicIdPartition5, sharePartitionData) -> {
            Assertions.assertTrue(linkedHashMap.containsKey(topicIdPartition5));
            Assertions.assertEquals(linkedHashMap.get(topicIdPartition5), sharePartitionData);
        });
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(linkedHashMap2, updateAndGenerateResponseData.responseData(hashMap));
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareRequestMetadata.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT), true);
        });
        Uuid randomUuid3 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(randomUuid3, 1), true);
        });
        ShareSessionContext newContext2 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        ShareSessionContext shareSessionContext = newContext2;
        synchronized (shareSessionContext.session()) {
            shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                TopicIdPartition topicIdPartition6 = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
                Assertions.assertTrue(linkedHashMap.containsKey(topicIdPartition6));
                Assertions.assertEquals(linkedHashMap.get(topicIdPartition6), reqData);
            });
        }
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(0, updateAndGenerateResponseData2.responseData(hashMap).size());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT), true);
        });
        ShareFetchResponse throttleResponse = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 2), true).throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse.error());
        Assertions.assertEquals(100, throttleResponse.throttleTimeMs());
        ShareFetchContext newContext3 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareRequestMetadata.memberId(), -1), true);
        Assertions.assertEquals(FinalContext.class, newContext3.getClass());
        Assertions.assertEquals(1, shareSessionCache.size());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap3.put(topicIdPartition4, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        Assertions.assertEquals(Errors.NONE, newContext3.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap3).error());
        CompletableFuture releaseSession = build.releaseSession("grp", shareRequestMetadata.memberId().toString());
        Assertions.assertTrue(releaseSession.isDone());
        Assertions.assertFalse(releaseSession.isCompletedExceptionally());
        Assertions.assertEquals(0, shareSessionCache.size());
    }

    @Test
    public void testShareSessionExpiration() {
        Time mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(2, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(newContext.getClass(), ShareSessionContext.class);
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareRequestMetadata.memberId());
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey));
        mockTime.sleep(500L);
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap3.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext2 = build.newContext("grp", linkedHashMap3, EMPTY_PART_LIST, shareRequestMetadata2, false);
        Assertions.assertEquals(newContext2.getClass(), ShareSessionContext.class);
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap4.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata2.memberId(), linkedHashMap4);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData2.responseData(hashMap).size());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey("grp", shareRequestMetadata2.memberId());
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey));
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey2));
        mockTime.sleep(500L);
        Assertions.assertEquals(build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareRequestMetadata.memberId(), 1), true).getClass(), ShareSessionContext.class);
        mockTime.sleep(501L);
        LinkedHashMap linkedHashMap5 = new LinkedHashMap();
        linkedHashMap5.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap5.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata3 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext3 = build.newContext("grp", linkedHashMap5, EMPTY_PART_LIST, shareRequestMetadata3, false);
        LinkedHashMap linkedHashMap6 = new LinkedHashMap();
        linkedHashMap6.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap6.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData3 = newContext3.updateAndGenerateResponseData("grp", shareRequestMetadata3.memberId(), linkedHashMap6);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData3.responseData(hashMap).size());
        ShareSessionKey shareSessionKey3 = new ShareSessionKey("grp", shareRequestMetadata3.memberId());
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey));
        Assertions.assertNull(shareSessionCache.get(shareSessionKey2), "share session 2 should have been evicted by latest share session, as share session 1 was used more recently");
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey3));
    }

    @Test
    public void testSubsequentShareSession() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        Map singletonMap = Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100));
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicIdPartition);
        ShareSessionContext newContext2 = build.newContext("grp", singletonMap, arrayList, new ShareRequestMetadata(shareRequestMetadata.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        HashSet hashSet = new HashSet();
        hashSet.add(topicIdPartition2);
        hashSet.add(topicIdPartition3);
        HashSet hashSet2 = new HashSet();
        newContext2.session().partitionMap().forEach(cachedSharePartition -> {
            hashSet2.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())));
        });
        Assertions.assertEquals(hashSet, hashSet2);
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition3.partition()));
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(1, updateAndGenerateResponseData2.data().responses().size());
        Assertions.assertEquals(randomUuid2, ((ShareFetchResponseData.ShareFetchableTopicResponse) updateAndGenerateResponseData2.data().responses().get(0)).topicId());
        Assertions.assertEquals(1, ((ShareFetchResponseData.ShareFetchableTopicResponse) updateAndGenerateResponseData2.data().responses().get(0)).partitions().size());
        Assertions.assertEquals(0, ((ShareFetchResponseData.PartitionData) ((ShareFetchResponseData.ShareFetchableTopicResponse) updateAndGenerateResponseData2.data().responses().get(0)).partitions().get(0)).partitionIndex());
        Assertions.assertEquals(1, updateAndGenerateResponseData2.responseData(hashMap).size());
    }

    @Test
    public void testZeroSizeShareSession() {
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicIdPartition);
        arrayList.add(topicIdPartition2);
        ShareFetchContext newContext2 = build.newContext("grp", Collections.emptyMap(), arrayList, new ShareRequestMetadata(shareRequestMetadata.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), new LinkedHashMap()).responseData(hashMap).isEmpty());
        Assertions.assertEquals(1, shareSessionCache.size());
    }

    @Test
    public void testToForgetPartitions() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        assertPartitionsPresent((ShareSessionContext) newContext, Arrays.asList(topicIdPartition, topicIdPartition2));
        mockUpdateAndGenerateResponseData(newContext, "grp", shareRequestMetadata.memberId());
        ShareFetchContext newContext2 = build.newContext("grp", Collections.emptyMap(), Collections.singletonList(topicIdPartition), new ShareRequestMetadata(shareRequestMetadata.memberId(), 1), true);
        assertPartitionsPresent((ShareSessionContext) newContext2, Collections.singletonList(topicIdPartition2));
        mockUpdateAndGenerateResponseData(newContext2, "grp", shareRequestMetadata.memberId());
        assertPartitionsPresent((ShareSessionContext) build.newContext("grp", Collections.emptyMap(), Collections.singletonList(topicIdPartition2), new ShareRequestMetadata(shareRequestMetadata.memberId(), 2), true), Collections.emptyList());
    }

    @Test
    public void testShareSessionUpdateTopicIdsBrokerSide() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        ShareSessionContext newContext2 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareRequestMetadata.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code()));
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(Errors.INCONSISTENT_TOPIC_ID.code(), ((ShareFetchResponseData.PartitionData) updateAndGenerateResponseData2.responseData(hashMap).get(topicIdPartition)).errorCode());
    }

    @Test
    public void testGetErroneousAndValidTopicIdPartitions() {
        Time mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition((String) null, 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition((String) null, 1));
        String str = "grp";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        linkedHashMap.put(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        assertErroneousAndValidTopicIdPartitions(newContext.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(topicIdPartition3), Arrays.asList(topicIdPartition, topicIdPartition2));
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        linkedHashMap2.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        Assertions.assertEquals(Errors.NONE, newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2).error());
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareRequestMetadata.memberId());
        ShareFetchResponse throttleResponse = newContext.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse.error());
        Assertions.assertEquals(100, throttleResponse.throttleTimeMs());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT), true);
        });
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(Uuid.randomUuid(), 1), true);
        });
        ShareSessionContext newContext2 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        assertErroneousAndValidTopicIdPartitions(newContext2.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(topicIdPartition3), Arrays.asList(topicIdPartition, topicIdPartition2));
        Assertions.assertEquals(Errors.NONE, newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2).error());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT), true);
        });
        ShareFetchContext newContext3 = build.newContext("grp", Collections.singletonMap(topicIdPartition4, new ShareFetchRequest.SharePartitionData(topicIdPartition4.topicId(), 100)), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 2), true);
        ShareFetchResponse throttleResponse2 = newContext3.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse2.error());
        Assertions.assertEquals(100, throttleResponse2.throttleTimeMs());
        assertErroneousAndValidTopicIdPartitions(newContext3.getErroneousAndValidTopicIdPartitions(), Arrays.asList(topicIdPartition3, topicIdPartition4), Arrays.asList(topicIdPartition, topicIdPartition2));
        ShareFetchContext newContext4 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareRequestMetadata.memberId(), -1), true);
        Assertions.assertEquals(FinalContext.class, newContext4.getClass());
        Assertions.assertEquals(1, shareSessionCache.size());
        assertErroneousAndValidTopicIdPartitions(newContext4.getErroneousAndValidTopicIdPartitions(), Collections.emptyList(), Collections.emptyList());
        ShareFetchResponse throttleResponse3 = newContext4.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse3.error());
        Assertions.assertEquals(100, throttleResponse3.throttleTimeMs());
        CompletableFuture releaseSession = build.releaseSession("grp", shareRequestMetadata.memberId().toString());
        Assertions.assertTrue(releaseSession.isDone());
        Assertions.assertFalse(releaseSession.isCompletedExceptionally());
        Assertions.assertEquals(0, shareSessionCache.size());
    }

    @Test
    public void testShareFetchContextResponseSize() {
        Time mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        String str = "grp";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
        short latestVersion = ApiKeys.SHARE_FETCH.latestVersion();
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int responseSize = newContext.responseSize(linkedHashMap2, latestVersion);
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(linkedHashMap2, updateAndGenerateResponseData.responseData(hashMap));
        Assertions.assertEquals(4 + updateAndGenerateResponseData.data().size(objectSerializationCache, latestVersion), responseSize);
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareRequestMetadata.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT), true);
        });
        Uuid randomUuid3 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(randomUuid3, 1), true);
        });
        ShareSessionContext newContext2 = build.newContext("grp", Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100)), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        int responseSize2 = newContext2.responseSize(linkedHashMap3, latestVersion);
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(4 + updateAndGenerateResponseData2.data().size(objectSerializationCache, latestVersion), responseSize2);
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT), true);
        });
        ShareFetchContext newContext3 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 2), true);
        int responseSize3 = newContext3.responseSize(linkedHashMap2, latestVersion);
        ShareFetchResponse throttleResponse = newContext3.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse.error());
        Assertions.assertEquals(100, throttleResponse.throttleTimeMs());
        Assertions.assertEquals(4 + new ShareFetchResponseData().size(objectSerializationCache, latestVersion), responseSize3);
        ShareFetchContext newContext4 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareRequestMetadata.memberId(), -1), true);
        Assertions.assertEquals(FinalContext.class, newContext4.getClass());
        Assertions.assertEquals(1, shareSessionCache.size());
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(topicIdPartition4, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int responseSize4 = newContext4.responseSize(linkedHashMap4, latestVersion);
        ShareFetchResponse updateAndGenerateResponseData3 = newContext4.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap4);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(4 + updateAndGenerateResponseData3.data().size(objectSerializationCache, latestVersion), responseSize4);
    }

    @Test
    public void testCachedTopicPartitionsWithNoTopicPartitions() {
        Assertions.assertTrue(SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build().cachedTopicIdPartitionsInShareSession("grp", Uuid.randomUuid()).isEmpty());
    }

    @Test
    public void testCachedTopicPartitionsForValidShareSessions() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        Uuid randomUuid3 = Uuid.randomUuid();
        Uuid randomUuid4 = Uuid.randomUuid();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata = new ShareRequestMetadata(randomUuid3, 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareRequestMetadata, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareRequestMetadata.memberId());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        Assertions.assertEquals(Errors.NONE, newContext.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap2).error());
        Assertions.assertEquals(new HashSet(Arrays.asList(topicIdPartition, topicIdPartition2)), new HashSet(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3)));
        Map singletonMap = Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100));
        ShareRequestMetadata shareRequestMetadata2 = new ShareRequestMetadata(randomUuid4, 0);
        ShareSessionContext newContext2 = build.newContext("grp", singletonMap, EMPTY_PART_LIST, shareRequestMetadata2, false);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertFalse(newContext2.isSubsequent());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey("grp", shareRequestMetadata2.memberId());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        Assertions.assertEquals(Errors.NONE, newContext2.updateAndGenerateResponseData("grp", shareRequestMetadata2.memberId(), linkedHashMap3).error());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), build.cachedTopicIdPartitionsInShareSession("grp", randomUuid4));
        ShareSessionContext newContext3 = build.newContext("grp", Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100)), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext3.getClass());
        Assertions.assertTrue(newContext3.isSubsequent());
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        Assertions.assertEquals(Errors.NONE, newContext3.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), linkedHashMap4).error());
        Assertions.assertEquals(new HashSet(Arrays.asList(topicIdPartition, topicIdPartition2, topicIdPartition3)), new HashSet(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3)));
        ShareSessionContext newContext4 = build.newContext("grp", Collections.singletonMap(topicIdPartition4, new ShareFetchRequest.SharePartitionData(topicIdPartition4.topicId(), 100)), Collections.singletonList(topicIdPartition3), new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext4.getClass());
        Assertions.assertTrue(newContext4.isSubsequent());
        LinkedHashMap linkedHashMap5 = new LinkedHashMap();
        linkedHashMap5.put(topicIdPartition4, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        Assertions.assertEquals(Errors.NONE, newContext4.updateAndGenerateResponseData("grp", shareRequestMetadata2.memberId(), linkedHashMap5).error());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition4), build.cachedTopicIdPartitionsInShareSession("grp", randomUuid4));
        ShareFetchContext newContext5 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareRequestMetadata(shareRequestMetadata.memberId(), -1), true);
        Assertions.assertEquals(FinalContext.class, newContext5.getClass());
        Assertions.assertEquals(Errors.NONE, newContext5.updateAndGenerateResponseData("grp", shareRequestMetadata.memberId(), new LinkedHashMap()).error());
        Assertions.assertFalse(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3).isEmpty());
        build.releaseSession("grp", shareRequestMetadata.memberId().toString());
        Assertions.assertTrue(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3).isEmpty());
        ShareSessionContext newContext6 = build.newContext("grp", Collections.emptyMap(), Collections.singletonList(topicIdPartition4), new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
        Assertions.assertEquals(ShareSessionContext.class, newContext6.getClass());
        Assertions.assertTrue(newContext6.isSubsequent());
        Assertions.assertEquals(Errors.NONE, newContext6.updateAndGenerateResponseData("grp", shareRequestMetadata2.memberId(), new LinkedHashMap()).error());
        Assertions.assertEquals(Collections.emptyList(), build.cachedTopicIdPartitionsInShareSession("grp", randomUuid4));
    }

    @Test
    public void testSharePartitionKey() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 0)));
        SharePartitionKey sharePartitionKey2 = new SharePartitionKey("mock-group-2", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 0)));
        SharePartitionKey sharePartitionKey3 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(1L, 1L), new TopicPartition("test-1", 0)));
        SharePartitionKey sharePartitionKey4 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 1)));
        SharePartitionKey sharePartitionKey5 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 0L), new TopicPartition("test-2", 0)));
        Assertions.assertEquals(sharePartitionKey, new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 0))));
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey2);
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey3);
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey4);
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey5);
        Assertions.assertNotEquals(sharePartitionKey, (Object) null);
    }

    @Test
    public void testMultipleSequentialShareFetches() {
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        Uuid randomUuid3 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 1));
        TopicIdPartition topicIdPartition5 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 2));
        TopicIdPartition topicIdPartition6 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 2));
        TopicIdPartition topicIdPartition7 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 3));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition3, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition4, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition5, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition6, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition7, Integer.valueOf(PARTITION_MAX_BYTES));
        mockFetchOffsetForTimestamp(this.mockReplicaManager);
        Time time = (Time) Mockito.mock(Time.class);
        Mockito.when(Long.valueOf(time.hiResClockMs())).thenReturn(0L).thenReturn(100L);
        Metrics metrics = new Metrics();
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition2, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition3, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition4, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition5, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition6, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition7, 1);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withReplicaManager(this.mockReplicaManager).withTime(time).withMetrics(metrics).withTimer(this.mockTimer).build();
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            return buildLogReadResult(hashMap.keySet());
        }).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(1))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(2))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(3))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(metrics.metricName("partition-load-time-avg", "share-group-metrics"), d -> {
            Assertions.assertEquals(d.intValue(), 14, "partition-load-time-avg");
        });
        hashMap2.put(metrics.metricName("partition-load-time-max", "share-group-metrics"), d2 -> {
            Assertions.assertEquals(d2, 100.0d, "partition-load-time-max");
        });
        hashMap2.forEach((metricName, consumer) -> {
            Assertions.assertTrue(metrics.metrics().containsKey(metricName));
            consumer.accept((Double) ((KafkaMetric) metrics.metrics().get(metricName)).metricValue());
        });
    }

    @Test
    public void testMultipleConcurrentShareFetches() throws InterruptedException {
        String str = "grp";
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        Uuid randomUuid3 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition3, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition4, Integer.valueOf(PARTITION_MAX_BYTES));
        Time mockTime = new MockTime(0L, System.currentTimeMillis(), 0L);
        mockFetchOffsetForTimestamp(this.mockReplicaManager);
        DelayedOperationPurgatory delayedOperationPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition2, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition3, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition4, 1);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withTime(mockTime).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition3 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition4 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Long.valueOf(sharePartition.nextFetchOffset())).thenReturn(1L, new Long[]{15L, 6L, 30L, 25L});
        Mockito.when(Long.valueOf(sharePartition2.nextFetchOffset())).thenReturn(4L, new Long[]{1L, 18L, 5L});
        Mockito.when(Long.valueOf(sharePartition3.nextFetchOffset())).thenReturn(10L, new Long[]{25L, 26L});
        Mockito.when(Long.valueOf(sharePartition4.nextFetchOffset())).thenReturn(20L, new Long[]{15L, 23L, 16L});
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            Assertions.assertEquals(1L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(4L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(10L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(20L, sharePartition4.nextFetchOffset());
            return buildLogReadResult(hashMap.keySet());
        }).doAnswer(invocationOnMock2 -> {
            Assertions.assertEquals(15L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(1L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(25L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(15L, sharePartition4.nextFetchOffset());
            return buildLogReadResult(hashMap.keySet());
        }).doAnswer(invocationOnMock3 -> {
            Assertions.assertEquals(6L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(18L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(26L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(23L, sharePartition4.nextFetchOffset());
            return buildLogReadResult(hashMap.keySet());
        }).doAnswer(invocationOnMock4 -> {
            Assertions.assertEquals(30L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(5L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(26L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(16L, sharePartition4.nextFetchOffset());
            return buildLogReadResult(hashMap.keySet());
        }).doAnswer(invocationOnMock5 -> {
            Assertions.assertEquals(25L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(5L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(26L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(16L, sharePartition4.nextFetchOffset());
            return buildLogReadResult(hashMap.keySet());
        }).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i != 100; i++) {
            try {
                newFixedThreadPool.submit(() -> {
                    build.fetchMessages(str, randomUuid.toString(), FETCH_PARAMS, hashMap);
                });
                if (i % 10 == 0) {
                    newFixedThreadPool.awaitTermination(50L, TimeUnit.MILLISECONDS);
                }
            } finally {
                if (!newFixedThreadPool.awaitTermination(50L, TimeUnit.MILLISECONDS)) {
                    newFixedThreadPool.shutdown();
                }
            }
        }
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.atMost(100))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.atLeast(10))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testReplicaManagerFetchShouldNotProceed() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(false);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        CompletableFuture fetchMessages = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build().fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(0))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(0, ((Map) fetchMessages.join()).size());
    }

    @Test
    public void testReplicaManagerFetchShouldProceed() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        mockFetchOffsetForTimestamp(this.mockReplicaManager);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition, 1);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            return buildLogReadResult(singletonMap.keySet());
        }).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(1))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testCloseSharePartitionManager() throws Exception {
        Timer timer = (Timer) Mockito.mock(SystemTimerReaper.class);
        Persister persister = (Persister) Mockito.mock(Persister.class);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withTimer(timer).withShareGroupPersister(persister).build();
        ((Timer) Mockito.verify(timer, Mockito.times(0))).close();
        ((Persister) Mockito.verify(persister, Mockito.times(0))).stop();
        build.close();
        ((Timer) Mockito.verify(timer, Mockito.times(1))).close();
    }

    @Test
    public void testReleaseSessionSuccess() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("baz", 4));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.releaseAcquiredRecords((String) ArgumentMatchers.eq(randomUuid.toString()))).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when(sharePartition2.releaseAcquiredRecords((String) ArgumentMatchers.eq(randomUuid.toString()))).thenReturn(FutureUtils.failedFuture(new InvalidRecordStateException("Unable to release acquired records for the batch")));
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        Mockito.when(shareSessionCache.remove(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(3);
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition));
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition2));
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition3));
        Mockito.when(shareSession.partitionMap()).thenReturn(implicitLinkedHashCollection);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        Map map = (Map) SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withPartitionCacheMap(hashMap).build().releaseSession("grp", randomUuid.toString()).join();
        Assertions.assertEquals(3, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertTrue(map.containsKey(topicIdPartition3));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(2, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.INVALID_RECORD_STATE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertEquals("Unable to release acquired records for the batch", ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).errorMessage());
        Assertions.assertEquals(4, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).partitionIndex());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).errorCode());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).errorMessage());
    }

    @Test
    public void testReleaseSessionWithIncorrectGroupId() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(3);
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition));
        Mockito.when(shareSession.partitionMap()).thenReturn(implicitLinkedHashCollection);
        CompletableFuture releaseSession = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build().releaseSession("grp-2", randomUuid.toString());
        Assertions.assertTrue(releaseSession.isDone());
        Assertions.assertTrue(releaseSession.isCompletedExceptionally());
        Objects.requireNonNull(releaseSession);
        Assertions.assertInstanceOf(ShareSessionNotFoundException.class, Assertions.assertThrows(ExecutionException.class, releaseSession::get).getCause());
    }

    @Test
    public void testReleaseSessionWithIncorrectMemberId() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", Uuid.randomUuid()))).thenReturn(shareSession);
        ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(3);
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition));
        Mockito.when(shareSession.partitionMap()).thenReturn(implicitLinkedHashCollection);
        CompletableFuture releaseSession = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build().releaseSession("grp", randomUuid.toString());
        Assertions.assertTrue(releaseSession.isDone());
        Assertions.assertTrue(releaseSession.isCompletedExceptionally());
        Objects.requireNonNull(releaseSession);
        Assertions.assertInstanceOf(ShareSessionNotFoundException.class, Assertions.assertThrows(ExecutionException.class, releaseSession::get).getCause());
    }

    @Test
    public void testReleaseSessionWithEmptyTopicPartitions() {
        Uuid randomUuid = Uuid.randomUuid();
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        Mockito.when(shareSessionCache.remove(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        Mockito.when(shareSession.partitionMap()).thenReturn(new ImplicitLinkedHashCollection());
        Assertions.assertEquals(0, ((Map) SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build().releaseSession("grp", randomUuid.toString()).join()).size());
    }

    @Test
    public void testReleaseSessionWithNullShareSession() {
        Uuid randomUuid = Uuid.randomUuid();
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", randomUuid))).thenReturn((Object) null);
        Mockito.when(shareSessionCache.remove(new ShareSessionKey("grp", randomUuid))).thenReturn((ShareSession) Mockito.mock(ShareSession.class));
        Assertions.assertEquals(0, ((Map) SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build().releaseSession("grp", randomUuid.toString()).join()).size());
    }

    @Test
    public void testAcknowledgeSinglePartition() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap2).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
    }

    @Test
    public void testAcknowledgeMultiplePartition() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition3 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when(sharePartition2.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when(sharePartition3.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        hashMap.put(new SharePartitionKey("grp", topicIdPartition3), sharePartition3);
        Metrics metrics = new Metrics();
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).withMetrics(metrics).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        hashMap2.put(topicIdPartition2, Arrays.asList(new ShareAcknowledgementBatch(15L, 26L, Collections.singletonList((byte) 2)), new ShareAcknowledgementBatch(34L, 56L, Collections.singletonList((byte) 2))));
        hashMap2.put(topicIdPartition3, Arrays.asList(new ShareAcknowledgementBatch(4L, 15L, Collections.singletonList((byte) 3)), new ShareAcknowledgementBatch(16L, 21L, Collections.singletonList((byte) 3))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap2).join();
        Assertions.assertEquals(3, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertTrue(map.containsKey(topicIdPartition3));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).errorCode());
        HashMap hashMap3 = new HashMap();
        hashMap3.put(metrics.metricName("share-acknowledgement-count", "share-group-metrics"), d -> {
            Assertions.assertEquals(d, 1.0d);
        });
        hashMap3.put(metrics.metricName("share-acknowledgement-rate", "share-group-metrics"), d2 -> {
            Assertions.assertTrue(d2.doubleValue() > 0.0d);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.ACCEPT.toString())), d3 -> {
            Assertions.assertEquals(2.0d, d3);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.RELEASE.toString())), d4 -> {
            Assertions.assertEquals(2.0d, d4);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.REJECT.toString())), d5 -> {
            Assertions.assertEquals(2.0d, d5);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.ACCEPT.toString())), d6 -> {
            Assertions.assertTrue(d6.doubleValue() > 0.0d);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.RELEASE.toString())), d7 -> {
            Assertions.assertTrue(d7.doubleValue() > 0.0d);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.REJECT.toString())), d8 -> {
            Assertions.assertTrue(d8.doubleValue() > 0.0d);
        });
        hashMap3.forEach((metricName, consumer) -> {
            Assertions.assertTrue(metrics.metrics().containsKey(metricName));
            consumer.accept((Double) ((KafkaMetric) metrics.metrics().get(metricName)).metricValue());
        });
    }

    @Test
    public void testAcknowledgeIncorrectGroupId() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Map map = (Map) build.acknowledge(uuid, "grp2", hashMap2).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorMessage());
    }

    @Test
    public void testAcknowledgeIncorrectMemberId() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Member is not the owner of batch record")));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap2).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.INVALID_REQUEST.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals("Member is not the owner of batch record", ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorMessage());
    }

    @Test
    public void testAcknowledgeEmptyPartitionCacheMap() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));
        SharePartitionManager build = SharePartitionManagerBuilder.builder().build();
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(78L, 90L, Collections.singletonList((byte) 2)), new ShareAcknowledgementBatch(94L, 99L, Collections.singletonList((byte) 2))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(3, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorMessage());
    }

    @Test
    public void testAcknowledgeCompletesDelayedShareFetchRequest() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        ((SharePartition) Mockito.doAnswer(invocationOnMock -> {
            Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition)).acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any());
        ((SharePartition) Mockito.doAnswer(invocationOnMock2 -> {
            Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition2)).acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), hashMap, 100);
        DelayedOperationPurgatory delayedOperationPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        Mockito.when(sharePartition.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition, 2);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(false);
        Mockito.when(Boolean.valueOf(sharePartition2.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(false);
        ArrayList arrayList = new ArrayList();
        hashMap.keySet().forEach(topicIdPartition3 -> {
            arrayList.add(new DelayedShareFetchGroupKey(str, topicIdPartition3.topicId(), topicIdPartition3.partition()));
        });
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        LinkedHashMap<TopicIdPartition, SharePartition> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(topicIdPartition, sharePartition);
        linkedHashMap.put(topicIdPartition2, sharePartition2);
        DelayedShareFetch build2 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(linkedHashMap).build();
        delayedOperationPurgatory.tryCompleteElseWatch(build2, arrayList);
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock3 -> {
            return buildLogReadResult(hashMap.keySet());
        }).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        build.acknowledge(uuid, "grp", hashMap3);
        Assertions.assertEquals(1, delayedOperationPurgatory.watched());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(1))).nextFetchOffset();
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).nextFetchOffset();
        Assertions.assertTrue(build2.lock().tryLock());
        build2.lock().unlock();
    }

    @Test
    public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition3 = (SharePartition) Mockito.mock(SharePartition.class);
        ((SharePartition) Mockito.doAnswer(invocationOnMock -> {
            Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition)).acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any());
        ((SharePartition) Mockito.doAnswer(invocationOnMock2 -> {
            Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition2)).acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any());
        ((SharePartition) Mockito.doAnswer(invocationOnMock3 -> {
            Mockito.when(Boolean.valueOf(sharePartition3.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition3)).acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition3), sharePartition3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), hashMap, 100);
        DelayedOperationPurgatory delayedOperationPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(false);
        Mockito.when(Boolean.valueOf(sharePartition2.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(false);
        Mockito.when(Boolean.valueOf(sharePartition3.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition3.canAcquireRecords())).thenReturn(false);
        ArrayList arrayList = new ArrayList();
        hashMap.keySet().forEach(topicIdPartition4 -> {
            arrayList.add(new DelayedShareFetchGroupKey(str, topicIdPartition4.topicId(), topicIdPartition4.partition()));
        });
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        LinkedHashMap<TopicIdPartition, SharePartition> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(topicIdPartition, sharePartition);
        linkedHashMap.put(topicIdPartition2, sharePartition2);
        linkedHashMap.put(topicIdPartition3, sharePartition3);
        DelayedShareFetch build2 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(linkedHashMap).build();
        delayedOperationPurgatory.tryCompleteElseWatch(build2, arrayList);
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topicIdPartition3, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        build.acknowledge(uuid, "grp", hashMap3);
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(0))).nextFetchOffset();
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).nextFetchOffset();
        Assertions.assertTrue(build2.lock().tryLock());
        build2.lock().unlock();
    }

    @Test
    public void testReleaseSessionCompletesDelayedShareFetchRequest() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        Mockito.when(shareSessionCache.remove(new ShareSessionKey("grp", Uuid.fromString(uuid)))).thenReturn((ShareSession) Mockito.mock(ShareSession.class));
        ((SharePartition) Mockito.doAnswer(invocationOnMock -> {
            Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition)).releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid));
        ((SharePartition) Mockito.doAnswer(invocationOnMock2 -> {
            Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition2)).releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), hashMap, 100);
        DelayedOperationPurgatory delayedOperationPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        Mockito.when(sharePartition.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, topicIdPartition, 1);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(false);
        Mockito.when(Boolean.valueOf(sharePartition2.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(false);
        ArrayList arrayList = new ArrayList();
        hashMap.keySet().forEach(topicIdPartition4 -> {
            arrayList.add(new DelayedShareFetchGroupKey(str, topicIdPartition4.topicId(), topicIdPartition4.partition()));
        });
        SharePartitionManager sharePartitionManager = (SharePartitionManager) Mockito.spy(SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withCache(shareSessionCache).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build());
        LinkedHashMap<TopicIdPartition, SharePartition> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(topicIdPartition, sharePartition);
        linkedHashMap.put(topicIdPartition2, sharePartition2);
        DelayedShareFetch build = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(linkedHashMap).build();
        delayedOperationPurgatory.tryCompleteElseWatch(build, arrayList);
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock3 -> {
            return buildLogReadResult(hashMap.keySet());
        }).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        Mockito.when(sharePartitionManager.cachedTopicIdPartitionsInShareSession("grp", Uuid.fromString(uuid))).thenReturn(Arrays.asList(topicIdPartition, topicIdPartition3));
        sharePartitionManager.releaseSession("grp", uuid);
        Assertions.assertEquals(1, delayedOperationPurgatory.watched());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(1))).nextFetchOffset();
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).nextFetchOffset();
        Assertions.assertTrue(build.lock().tryLock());
        build.lock().unlock();
    }

    @Test
    public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition3 = (SharePartition) Mockito.mock(SharePartition.class);
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        Mockito.when(shareSessionCache.remove(new ShareSessionKey("grp", Uuid.fromString(uuid)))).thenReturn((ShareSession) Mockito.mock(ShareSession.class));
        ((SharePartition) Mockito.doAnswer(invocationOnMock -> {
            Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition)).releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid));
        ((SharePartition) Mockito.doAnswer(invocationOnMock2 -> {
            Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition2)).releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid));
        ((SharePartition) Mockito.doAnswer(invocationOnMock3 -> {
            Mockito.when(Boolean.valueOf(sharePartition3.canAcquireRecords())).thenReturn(true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when(sharePartition3)).releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition3), sharePartition3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), hashMap, 100);
        DelayedOperationPurgatory delayedOperationPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(false);
        Mockito.when(Boolean.valueOf(sharePartition2.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(false);
        Mockito.when(Boolean.valueOf(sharePartition3.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition3.canAcquireRecords())).thenReturn(false);
        ArrayList arrayList = new ArrayList();
        hashMap.keySet().forEach(topicIdPartition4 -> {
            arrayList.add(new DelayedShareFetchGroupKey(str, topicIdPartition4.topicId(), topicIdPartition4.partition()));
        });
        SharePartitionManager sharePartitionManager = (SharePartitionManager) Mockito.spy(SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withCache(shareSessionCache).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build());
        LinkedHashMap<TopicIdPartition, SharePartition> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(topicIdPartition, sharePartition);
        linkedHashMap.put(topicIdPartition2, sharePartition2);
        linkedHashMap.put(topicIdPartition3, sharePartition3);
        DelayedShareFetch build = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(linkedHashMap).build();
        delayedOperationPurgatory.tryCompleteElseWatch(build, arrayList);
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        Mockito.when(sharePartitionManager.cachedTopicIdPartitionsInShareSession("grp", Uuid.fromString(uuid))).thenReturn(Collections.singletonList(topicIdPartition3));
        sharePartitionManager.releaseSession("grp", uuid);
        Assertions.assertEquals(2, delayedOperationPurgatory.watched());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(0))).nextFetchOffset();
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).nextFetchOffset();
        Assertions.assertTrue(build.lock().tryLock());
        build.lock().unlock();
    }

    @Test
    public void testPendingInitializationShouldCompleteFetchRequest() throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(completableFuture);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        CompletableFuture fetchMessages = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build().fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages);
        TestUtils.waitForCondition(fetchMessages::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        Assertions.assertTrue(((Map) fetchMessages.join()).isEmpty());
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(0))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertFalse(completableFuture.isDone());
        completableFuture.complete(null);
    }

    @Test
    public void testDelayedInitializationShouldCompleteFetchRequest() throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(completableFuture).thenReturn(completableFuture2).thenReturn(CompletableFuture.failedFuture(new LeaderNotAvailableException("Leader not available")));
        DelayedOperationPurgatory delayedOperationPurgatory = (DelayedOperationPurgatory) Mockito.spy(new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, delayedOperationPurgatory);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        CompletableFuture fetchMessages = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        CompletableFuture fetchMessages2 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        CompletableFuture fetchMessages3 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(3))).maybeInitialize();
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(3))).addDelayedShareFetchRequest((DelayedShareFetch) ArgumentMatchers.any(), (List) ArgumentMatchers.any());
        ((DelayedOperationPurgatory) Mockito.verify(delayedOperationPurgatory, Mockito.times(3))).tryCompleteElseWatch((DelayedShareFetch) ArgumentMatchers.any(), (List) ArgumentMatchers.any());
        ((DelayedOperationPurgatory) Mockito.verify(delayedOperationPurgatory, Mockito.times(0))).checkAndComplete((DelayedOperationKey) ArgumentMatchers.any());
        Assertions.assertFalse(fetchMessages.isDone());
        Assertions.assertFalse(fetchMessages2.isDone());
        Assertions.assertFalse(fetchMessages3.isDone());
        completableFuture.complete(null);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(1))).completeDelayedShareFetchRequest((DelayedShareFetchKey) ArgumentMatchers.any());
        ((DelayedOperationPurgatory) Mockito.verify(delayedOperationPurgatory, Mockito.times(1))).checkAndComplete((DelayedOperationKey) ArgumentMatchers.any());
        completableFuture2.complete(null);
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(2))).completeDelayedShareFetchRequest((DelayedShareFetchKey) ArgumentMatchers.any());
        ((DelayedOperationPurgatory) Mockito.verify(delayedOperationPurgatory, Mockito.times(2))).checkAndComplete((DelayedOperationKey) ArgumentMatchers.any());
        ((ReplicaManager) Mockito.verify(this.mockReplicaManager, Mockito.times(0))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testSharePartitionInitializationExceptions() throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new LeaderNotAvailableException("Leader not available")));
        CompletableFuture fetchMessages = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages);
        TestUtils.waitForCondition(fetchMessages::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        Assertions.assertFalse(fetchMessages.isCompletedExceptionally());
        Assertions.assertTrue(((Map) fetchMessages.join()).isEmpty());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(0))).markFenced();
        Assertions.assertEquals(1, hashMap.size());
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new IllegalStateException("Illegal state")));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages2 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages2);
        TestUtils.waitForCondition(fetchMessages2::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        validateShareFetchFutureException(fetchMessages2, topicIdPartition, Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(1))).markFenced();
        Assertions.assertTrue(hashMap.isEmpty());
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new CoordinatorNotAvailableException("Coordinator not available")));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages3 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages3);
        TestUtils.waitForCondition(fetchMessages3::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        validateShareFetchFutureException(fetchMessages3, topicIdPartition, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(2))).markFenced();
        Assertions.assertTrue(hashMap.isEmpty());
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Invalid request")));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages4 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages4);
        TestUtils.waitForCondition(fetchMessages4::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        validateShareFetchFutureException(fetchMessages4, topicIdPartition, Errors.INVALID_REQUEST, "Invalid request");
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(3))).markFenced();
        Assertions.assertTrue(hashMap.isEmpty());
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new FencedStateEpochException("Fenced state epoch")));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages5 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages5);
        TestUtils.waitForCondition(fetchMessages5::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        validateShareFetchFutureException(fetchMessages5, topicIdPartition, Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(4))).markFenced();
        Assertions.assertTrue(hashMap.isEmpty());
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new NotLeaderOrFollowerException("Not leader or follower")));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages6 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages6);
        TestUtils.waitForCondition(fetchMessages6::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        validateShareFetchFutureException(fetchMessages6, topicIdPartition, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(MAX_DELIVERY_COUNT))).markFenced();
        Assertions.assertTrue(hashMap.isEmpty());
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new RuntimeException("Runtime exception")));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages7 = build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages7);
        TestUtils.waitForCondition(fetchMessages7::isDone, 3000L, () -> {
            return "Processing in delayed share fetch queue never ended.";
        });
        validateShareFetchFutureException(fetchMessages7, topicIdPartition, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(6))).markFenced();
        Assertions.assertTrue(hashMap.isEmpty());
    }

    @Test
    public void testShareFetchProcessingExceptions() throws Exception {
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        Map<SharePartitionKey, SharePartition> map = (Map) Mockito.mock(Map.class);
        Mockito.when(map.computeIfAbsent((SharePartitionKey) ArgumentMatchers.any(), (Function) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException("Error creating instance")});
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages = SharePartitionManagerBuilder.builder().withPartitionCacheMap(map).build().fetchMessages("grp", Uuid.randomUuid().toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages);
        TestUtils.waitForCondition(fetchMessages::isDone, 3000L, () -> {
            return "Processing for delayed share fetch request not finished.";
        });
        validateShareFetchFutureException(fetchMessages, topicIdPartition, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
    }

    @Test
    public void testSharePartitionInitializationFailure() throws Exception {
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        HashMap hashMap = new HashMap();
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(Boolean.valueOf(partition.isLeader())).thenReturn(false);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenThrow(new Throwable[]{new KafkaStorageException("Exception")}).thenReturn(partition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withPartitionCacheMap(hashMap).build();
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages = build.fetchMessages("grp", Uuid.randomUuid().toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages);
        TestUtils.waitForCondition(fetchMessages::isDone, 3000L, () -> {
            return "Processing for delayed share fetch request not finished.";
        });
        validateShareFetchFutureException(fetchMessages, topicIdPartition, Errors.KAFKA_STORAGE_ERROR, "Exception");
        Assertions.assertTrue(hashMap.isEmpty());
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages2 = build.fetchMessages("grp", Uuid.randomUuid().toString(), FETCH_PARAMS, singletonMap);
        Objects.requireNonNull(fetchMessages2);
        TestUtils.waitForCondition(fetchMessages2::isDone, 3000L, () -> {
            return "Processing for delayed share fetch request not finished.";
        });
        validateShareFetchFutureException(fetchMessages2, topicIdPartition, Errors.NOT_LEADER_OR_FOLLOWER);
        Assertions.assertTrue(hashMap.isEmpty());
    }

    @Test
    public void testSharePartitionPartialInitializationFailure() throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 2));
        Map of = Map.of(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES), topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES), topicIdPartition3, Integer.valueOf(PARTITION_MAX_BYTES));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(Boolean.valueOf(partition.isLeader())).thenReturn(false);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when(sharePartition.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (FetchPartitionData) ArgumentMatchers.any())).thenReturn(new ShareAcquiredRecords(Collections.emptyList(), 0));
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        hashMap.put(new SharePartitionKey("grp", topicIdPartition3), sharePartition2);
        Mockito.when(sharePartition2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new FencedStateEpochException("Fenced state epoch")));
        mockReplicaManagerDelayedShareFetch(replicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        Mockito.when(sharePartition.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, topicIdPartition2, 1);
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            return buildLogReadResult(Collections.singleton(topicIdPartition2));
        }).when(replicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        CompletableFuture fetchMessages = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withPartitionCacheMap(hashMap).build().fetchMessages("grp", Uuid.randomUuid().toString(), FETCH_PARAMS, of);
        Assertions.assertTrue(fetchMessages.isDone());
        Assertions.assertFalse(fetchMessages.isCompletedExceptionally());
        Map map = (Map) fetchMessages.get();
        Assertions.assertEquals(3, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertTrue(map.containsKey(topicIdPartition3));
        Assertions.assertEquals(Errors.FENCED_STATE_EPOCH.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition3)).errorCode());
        Assertions.assertEquals("Fenced state epoch", ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition3)).errorMessage());
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedShareFetchRequest((DelayedShareFetchKey) ArgumentMatchers.any());
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testReplicaManagerFetchException() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Map singletonMap = Collections.singletonMap(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        ((ReplicaManager) Mockito.doThrow(new Throwable[]{new RuntimeException("Exception")}).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        validateShareFetchFutureException(build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap), topicIdPartition, Errors.UNKNOWN_SERVER_ERROR, "Exception");
        Assertions.assertEquals(1, hashMap.size());
        ((ReplicaManager) Mockito.doThrow(new Throwable[]{new NotLeaderOrFollowerException("Leader exception")}).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        validateShareFetchFutureException(build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, singletonMap), topicIdPartition, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
        Assertions.assertTrue(hashMap.isEmpty());
    }

    @Test
    public void testReplicaManagerFetchMultipleSharePartitionsException() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
        Mockito.when(sharePartition.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Boolean.valueOf(sharePartition2.maybeAcquireFetchLock())).thenReturn(false);
        Mockito.when(sharePartition2.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, this.mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
        ((ReplicaManager) Mockito.doThrow(new Throwable[]{new FencedStateEpochException("Fenced exception")}).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withReplicaManager(this.mockReplicaManager).withTimer(this.mockTimer).build();
        validateShareFetchFutureException(build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap), topicIdPartition, Errors.FENCED_STATE_EPOCH, "Fenced exception");
        Assertions.assertEquals(1, hashMap2.size());
        Assertions.assertEquals(sharePartition2, hashMap2.get(new SharePartitionKey("grp", topicIdPartition2)));
        Mockito.when(Boolean.valueOf(sharePartition2.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition2.canAcquireRecords())).thenReturn(true);
        hashMap2.put(new SharePartitionKey("grp", topicIdPartition), sharePartition);
        ((ReplicaManager) Mockito.doThrow(new Throwable[]{new FencedStateEpochException("Fenced exception again")}).when(this.mockReplicaManager)).readFromLog((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        validateShareFetchFutureException(build.fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap), List.of(topicIdPartition, topicIdPartition2), Errors.FENCED_STATE_EPOCH, "Fenced exception again");
        Assertions.assertTrue(hashMap2.isEmpty());
    }

    @Test
    public void testListenerRegistration() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) Mockito.any())).thenReturn(mockPartition());
        SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withTimer(this.mockTimer).build().fetchMessages("grp", randomUuid.toString(), FETCH_PARAMS, hashMap);
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(2))).maybeAddListener((TopicPartition) ArgumentMatchers.any(), (PartitionListener) ArgumentMatchers.any());
    }

    @Test
    public void testSharePartitionListenerOnFailed() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        HashMap hashMap = new HashMap();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener sharePartitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, replicaManager, hashMap);
        Objects.requireNonNull(sharePartitionListener);
        testSharePartitionListener(sharePartitionKey, hashMap, replicaManager, sharePartitionListener::onFailed);
    }

    @Test
    public void testSharePartitionListenerOnDeleted() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        HashMap hashMap = new HashMap();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener sharePartitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, replicaManager, hashMap);
        Objects.requireNonNull(sharePartitionListener);
        testSharePartitionListener(sharePartitionKey, hashMap, replicaManager, sharePartitionListener::onDeleted);
    }

    @Test
    public void testSharePartitionListenerOnBecomingFollower() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        HashMap hashMap = new HashMap();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener sharePartitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, replicaManager, hashMap);
        Objects.requireNonNull(sharePartitionListener);
        testSharePartitionListener(sharePartitionKey, hashMap, replicaManager, sharePartitionListener::onBecomingFollower);
    }

    private void testSharePartitionListener(SharePartitionKey sharePartitionKey, Map<SharePartitionKey, SharePartition> map, ReplicaManager replicaManager, Consumer<TopicPartition> consumer) {
        TopicPartition topicPartition = new TopicPartition("foo", 1);
        SharePartitionKey sharePartitionKey2 = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), topicPartition));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        map.put(sharePartitionKey, sharePartition);
        map.put(sharePartitionKey2, sharePartition2);
        consumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
        Assertions.assertEquals(1, map.size());
        Assertions.assertFalse(map.containsKey(sharePartitionKey));
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(1))).markFenced();
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).removeListener((TopicPartition) ArgumentMatchers.any(), (PartitionListener) ArgumentMatchers.any());
        consumer.accept(topicPartition);
        Assertions.assertEquals(1, map.size());
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).markFenced();
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).removeListener((TopicPartition) ArgumentMatchers.any(), (PartitionListener) ArgumentMatchers.any());
    }

    private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
    }

    private ShareFetchResponseData.PartitionData errorShareFetchResponse(Short sh) {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0).setErrorCode(sh.shortValue());
    }

    private void mockUpdateAndGenerateResponseData(ShareFetchContext shareFetchContext, String str, Uuid uuid) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (shareFetchContext.getClass() == ShareSessionContext.class) {
            ShareSessionContext shareSessionContext = (ShareSessionContext) shareFetchContext;
            if (shareSessionContext.isSubsequent()) {
                synchronized (shareSessionContext.session()) {
                    shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                        TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                        linkedHashMap.put(topicIdPartition, topicIdPartition.topic() == null ? errorShareFetchResponse(Short.valueOf(Errors.UNKNOWN_TOPIC_ID.code())) : noErrorShareFetchResponse());
                    });
                }
            } else {
                shareSessionContext.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> {
                    linkedHashMap.put(topicIdPartition, topicIdPartition.topic() == null ? errorShareFetchResponse(Short.valueOf(Errors.UNKNOWN_TOPIC_ID.code())) : noErrorShareFetchResponse());
                });
            }
        }
        shareFetchContext.updateAndGenerateResponseData(str, uuid, linkedHashMap);
    }

    private void assertPartitionsPresent(ShareSessionContext shareSessionContext, List<TopicIdPartition> list) {
        HashSet hashSet = new HashSet();
        if (shareSessionContext.isSubsequent()) {
            shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                hashSet.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())));
            });
        } else {
            shareSessionContext.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> {
                hashSet.add(topicIdPartition);
            });
        }
        Assertions.assertEquals(new HashSet(list), hashSet);
    }

    private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData, List<TopicIdPartition> list, List<TopicIdPartition> list2) {
        HashSet hashSet = new HashSet(list);
        HashSet hashSet2 = new HashSet(list2);
        HashSet hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        erroneousAndValidPartitionData.erroneous().forEach((topicIdPartition, partitionData) -> {
            hashSet3.add(topicIdPartition);
        });
        erroneousAndValidPartitionData.validTopicIdPartitions().forEach((topicIdPartition2, sharePartitionData) -> {
            hashSet4.add(topicIdPartition2);
        });
        Assertions.assertEquals(hashSet, hashSet3);
        Assertions.assertEquals(hashSet2, hashSet4);
    }

    private Partition mockPartition() {
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(Boolean.valueOf(partition.isLeader())).thenReturn(true);
        Mockito.when(Integer.valueOf(partition.getLeaderEpoch())).thenReturn(1);
        return partition;
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> completableFuture, TopicIdPartition topicIdPartition, Errors errors) {
        validateShareFetchFutureException(completableFuture, Collections.singletonList(topicIdPartition), errors, (String) null);
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> completableFuture, TopicIdPartition topicIdPartition, Errors errors, String str) {
        validateShareFetchFutureException(completableFuture, Collections.singletonList(topicIdPartition), errors, str);
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> completableFuture, List<TopicIdPartition> list, Errors errors, String str) {
        Assertions.assertFalse(completableFuture.isCompletedExceptionally());
        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> join = completableFuture.join();
        Assertions.assertEquals(list.size(), join.size());
        list.forEach(topicIdPartition -> {
            Assertions.assertTrue(join.containsKey(topicIdPartition));
            Assertions.assertEquals(topicIdPartition.partition(), ((ShareFetchResponseData.PartitionData) join.get(topicIdPartition)).partitionIndex());
            Assertions.assertEquals(errors.code(), ((ShareFetchResponseData.PartitionData) join.get(topicIdPartition)).errorCode());
            Assertions.assertEquals(str, ((ShareFetchResponseData.PartitionData) join.get(topicIdPartition)).errorMessage());
        });
    }

    private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
        ((ReplicaManager) Mockito.doReturn(new OffsetResultHolder(Optional.of(new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty())), Optional.empty())).when(replicaManager)).fetchOffsetForTimestamp((TopicPartition) Mockito.any(TopicPartition.class), Mockito.anyLong(), (Option) Mockito.any(), (Optional) Mockito.any(), Mockito.anyBoolean());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> set) {
        ArrayList arrayList = new ArrayList();
        set.forEach(topicIdPartition -> {
            arrayList.add(new Tuple2(topicIdPartition, new LogReadResult(new FetchDataInfo(new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.EMPTY), Option.empty(), -1L, -1L, -1L, -1L, -1L, Option.empty(), Option.empty(), Option.empty())));
        });
        return CollectionConverters.asScala(arrayList).toSeq();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void mockReplicaManagerDelayedShareFetch(ReplicaManager replicaManager, DelayedOperationPurgatory<DelayedShareFetch> delayedOperationPurgatory) {
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            delayedOperationPurgatory.checkAndComplete((DelayedShareFetchKey) invocationOnMock.getArguments()[0]);
            return null;
        }).when(replicaManager)).completeDelayedShareFetchRequest((DelayedShareFetchKey) ArgumentMatchers.any(DelayedShareFetchKey.class));
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock2 -> {
            Object[] arguments = invocationOnMock2.getArguments();
            delayedOperationPurgatory.tryCompleteElseWatch((DelayedShareFetch) arguments[0], (List) arguments[1]);
            return null;
        }).when(replicaManager)).addDelayedShareFetchRequest((DelayedShareFetch) ArgumentMatchers.any(), (List) ArgumentMatchers.any());
    }
}
