package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.class */
public class OffsetsRequestManagerTest {
    private OffsetsRequestManager requestManager;
    private ConsumerMetadata metadata;
    private SubscriptionState subscriptionState;
    private ApiVersions apiVersions;
    private static final String TEST_TOPIC = "t1";
    private static final TopicPartition TEST_PARTITION_1 = new TopicPartition(TEST_TOPIC, 1);
    private static final TopicPartition TEST_PARTITION_2 = new TopicPartition(TEST_TOPIC, 2);
    private static final Node LEADER_1 = new Node(0, "host1", 9092);
    private static final Node LEADER_2 = new Node(0, "host2", 9092);
    private static final IsolationLevel DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_COMMITTED;
    private static final int RETRY_BACKOFF_MS = 500;
    private static final int REQUEST_TIMEOUT_MS = 500;
    private static final int DEFAULT_API_TIMEOUT_MS = 500;
    private final Time time = (Time) Mockito.mock(Time.class);
    private final CommitRequestManager commitRequestManager = (CommitRequestManager) Mockito.mock(CommitRequestManager.class);

    @BeforeEach
    public void setup() {
        LogContext logContext = new LogContext();
        this.metadata = (ConsumerMetadata) Mockito.mock(ConsumerMetadata.class);
        this.subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        this.apiVersions = (ApiVersions) Mockito.mock(ApiVersions.class);
        this.requestManager = new OffsetsRequestManager(this.subscriptionState, this.metadata, DEFAULT_ISOLATION_LEVEL, this.time, 500L, 500, 500L, this.apiVersions, (NetworkClientDelegate) Mockito.mock(NetworkClientDelegate.class), this.commitRequestManager, logContext);
    }

    @Test
    public void testListOffsetsRequest_Success() throws ExecutionException, InterruptedException {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        verifySuccessfulPollAndResponseReceived(fetchOffsets, Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1L, Optional.empty())));
    }

    @Test
    public void testListOffsetsWaitingForMetadataUpdate_Timeout() {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockFailedRequest_MissingLeader();
        CompletableFuture fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertEquals(1, this.requestManager.requestsToRetry());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(true);
        Assertions.assertEquals(0, this.requestManager.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertThrows(TimeoutException.class, () -> {
            fetchOffsets.get(5L, TimeUnit.MILLISECONDS);
        });
    }

    @Test
    public void testListOffsetsRequestMultiplePartitions() throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put(TEST_PARTITION_1, -2L);
        hashMap.put(TEST_PARTITION_2, -2L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TEST_PARTITION_1, LEADER_1);
        hashMap2.put(TEST_PARTITION_2, LEADER_1);
        mockSuccessfulRequest(hashMap2);
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(hashMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        verifySuccessfulPollAndResponseReceived(fetchOffsets, (Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndTimestampInternal(5L, -1L, Optional.empty());
        })));
    }

    @Test
    public void testListOffsetsRequestEmpty() throws ExecutionException, InterruptedException {
        CompletableFuture fetchOffsets = this.requestManager.fetchOffsets(Collections.emptyMap(), false);
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertTrue(this.requestManager.poll(this.time.milliseconds()).unsentRequests.isEmpty());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertTrue(fetchOffsets.isDone());
        Assertions.assertFalse(fetchOffsets.isCompletedExceptionally());
        Assertions.assertTrue(((Map) fetchOffsets.get()).isEmpty());
    }

    @Test
    public void testListOffsetsRequestUnknownOffset() throws ExecutionException, InterruptedException {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        List<ListOffsetsResponseData.ListOffsetsTopicResponse> singletonList = Collections.singletonList(mockUnknownOffsetResponse(TEST_PARTITION_1));
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        buildClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), singletonList).onComplete();
        verifyRequestSuccessfullyCompleted(fetchOffsets, Collections.singletonMap(TEST_PARTITION_1, null));
    }

    @Test
    public void testListOffsetsWaitingForMetadataUpdate_RetrySucceeds() throws ExecutionException, InterruptedException {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockFailedRequest_MissingLeader();
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertEquals(1, this.requestManager.requestsToRetry());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(true);
        Assertions.assertEquals(0, this.requestManager.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertFalse(fetchOffsets.isDone());
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        this.requestManager.onUpdate(new ClusterResource(""));
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        verifySuccessfulPollAndResponseReceived(fetchOffsets, Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1L, Optional.empty())));
    }

    @MethodSource({"retriableErrors"})
    @ParameterizedTest
    public void testRequestFailsWithRetriableError_RetrySucceeds(Errors errors) throws ExecutionException, InterruptedException {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        Assertions.assertFalse(fetchOffsets.isDone());
        buildClientResponseWithErrors((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Collections.singletonMap(TEST_PARTITION_1, errors)).onComplete();
        Assertions.assertFalse(fetchOffsets.isDone());
        Assertions.assertEquals(1, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(false);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        this.requestManager.onUpdate(new ClusterResource(""));
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        verifySuccessfulPollAndResponseReceived(fetchOffsets, Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1L, Optional.empty())));
    }

    @Test
    public void testRequestNotSupportedErrorReturnsNullOffset() throws ExecutionException, InterruptedException {
        testResponseWithErrorCodeAndUnknownOffsets(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
    }

    @Test
    public void testRequestWithUnknownOffsetInResponseReturnsNullOffset() throws ExecutionException, InterruptedException {
        testResponseWithErrorCodeAndUnknownOffsets(Errors.NONE);
    }

    private void testResponseWithErrorCodeAndUnknownOffsets(Errors errors) throws ExecutionException, InterruptedException {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        Assertions.assertFalse(fetchOffsets.isDone());
        buildClientResponseWithErrors((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Collections.singletonMap(TEST_PARTITION_1, errors)).onComplete();
        verifyRequestSuccessfullyCompleted(fetchOffsets, Collections.singletonMap(TEST_PARTITION_1, null));
    }

    @Test
    public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put(TEST_PARTITION_1, -2L);
        hashMap.put(TEST_PARTITION_2, -2L);
        Map<TopicPartition, OffsetAndTimestampInternal> map = (Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndTimestampInternal(5L, -1L, Optional.empty());
        }));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TEST_PARTITION_1, LEADER_1);
        hashMap2.put(TEST_PARTITION_2, LEADER_2);
        mockSuccessfulRequest(hashMap2);
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(hashMap, false);
        Assertions.assertEquals(2, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll, 2);
        Assertions.assertFalse(fetchOffsets.isDone());
        buildClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(map.get(TEST_PARTITION_1).offset(), -1L, Optional.empty()))).onComplete();
        buildClientResponseWithErrors((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(1), Collections.singletonMap(TEST_PARTITION_2, Errors.UNKNOWN_LEADER_EPOCH)).onComplete();
        Assertions.assertFalse(fetchOffsets.isDone());
        Assertions.assertEquals(1, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(false);
        mockSuccessfulRequest(hashMap2);
        this.requestManager.onUpdate(new ClusterResource(""));
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        NetworkClientDelegate.PollResult poll2 = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll2);
        buildClientResponse((NetworkClientDelegate.UnsentRequest) poll2.unsentRequests.get(0), Collections.singletonMap(TEST_PARTITION_2, new OffsetAndTimestampInternal(map.get(TEST_PARTITION_2).offset(), -1L, Optional.empty()))).onComplete();
        verifyRequestSuccessfullyCompleted(fetchOffsets, map);
    }

    @Test
    public void testRequestFailedResponse_NonRetriableAuthError() {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        buildClientResponseWithErrors((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Collections.singletonMap(TEST_PARTITION_2, Errors.TOPIC_AUTHORIZATION_FAILED)).onComplete();
        verifyRequestCompletedWithErrorResponse(fetchOffsets, TopicAuthorizationException.class);
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testRequestFailedResponse_NonRetriableErrorTimeout() {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        buildClientResponseWithErrors((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Collections.singletonMap(TEST_PARTITION_2, Errors.BROKER_NOT_AVAILABLE)).onComplete();
        Assertions.assertFalse(fetchOffsets.isDone());
        Assertions.assertThrows(TimeoutException.class, () -> {
            fetchOffsets.get(5L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testRequestFails_AuthenticationException() {
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture fetchOffsets = this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        buildClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Collections.emptyList(), false, new AuthenticationException("Authentication failed")).onComplete();
        Assertions.assertTrue(fetchOffsets.isCompletedExceptionally());
        Objects.requireNonNull(fetchOffsets);
        Assertions.assertEquals(AuthenticationException.class, Assertions.assertThrows(ExecutionException.class, fetchOffsets::get).getCause().getClass());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() {
        Mockito.when(this.subscriptionState.partitionsNeedingReset(this.time.milliseconds())).thenReturn(Collections.emptySet());
        this.requestManager.resetPositionsIfNeeded();
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testResetPositionsMissingLeader() {
        mockFailedRequest_MissingLeader();
        Mockito.when(this.subscriptionState.partitionsNeedingReset(this.time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
        Mockito.when(this.subscriptionState.resetStrategy((TopicPartition) ArgumentMatchers.any())).thenReturn(AutoOffsetResetStrategy.EARLIEST);
        this.requestManager.resetPositionsIfNeeded();
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(true);
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testResetPositionsSuccess_NoLeaderEpochInResponse() {
        testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch());
        ((ConsumerMetadata) Mockito.verify(this.metadata, Mockito.never())).updateLastSeenEpochIfNewer((TopicPartition) ArgumentMatchers.any(), ArgumentMatchers.anyInt());
    }

    @Test
    public void testResetPositionsSuccess_LeaderEpochInResponse() {
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), Optional.of(5));
        testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(TEST_PARTITION_1, ((Integer) leaderAndEpoch.epoch.get()).intValue());
    }

    @Test
    public void testResetOffsetsAuthorizationFailure() {
        Mockito.when(this.subscriptionState.partitionsNeedingReset(this.time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
        Mockito.when(this.subscriptionState.resetStrategy((TopicPartition) ArgumentMatchers.any())).thenReturn(AutoOffsetResetStrategy.EARLIEST);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        CompletableFuture resetPositionsIfNeeded = this.requestManager.resetPositionsIfNeeded();
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) this.requestManager.poll(this.time.milliseconds()).unsentRequests.get(0);
        Assertions.assertFalse(resetPositionsIfNeeded.isDone());
        buildClientResponseWithErrors(unsentRequest, Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED)).onComplete();
        Assertions.assertTrue(unsentRequest.future().isDone());
        Assertions.assertTrue(resetPositionsIfNeeded.isDone());
        Assertions.assertFalse(unsentRequest.future().isCompletedExceptionally());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).requestFailed((Set) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(false);
        CompletableFuture completableFuture = (CompletableFuture) Assertions.assertDoesNotThrow(() -> {
            return this.requestManager.resetPositionsIfNeeded();
        });
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertTrue(completableFuture.isCompletedExceptionally());
        TestUtils.assertFutureThrows(completableFuture, TopicAuthorizationException.class);
    }

    @Test
    public void testValidatePositionsSuccess() {
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), Optional.of(3));
        TopicPartition topicPartition = TEST_PARTITION_1;
        mockSuccessfulBuildRequestForValidatingPositions(new SubscriptionState.FetchPosition(5, Optional.of(10), leaderAndEpoch), LEADER_1);
        this.requestManager.validatePositionsIfNeeded();
        Assertions.assertEquals(1, this.requestManager.requestsToSend(), "Invalid request count");
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).setNextAllowedRetry((Set) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        Mockito.when(this.metadata.currentLeader(topicPartition)).thenReturn(testLeaderEpoch(LEADER_1, leaderAndEpoch.epoch));
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) this.requestManager.poll(this.time.milliseconds()).unsentRequests.get(0);
        buildOffsetsForLeaderEpochResponse(unsentRequest, Collections.singletonList(topicPartition), 100).onComplete();
        Assertions.assertTrue(unsentRequest.future().isDone());
        Assertions.assertFalse(unsentRequest.future().isCompletedExceptionally());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).maybeCompleteValidation((TopicPartition) ArgumentMatchers.any(), (SubscriptionState.FetchPosition) ArgumentMatchers.any(), (OffsetForLeaderEpochResponseData.EpochEndOffset) ArgumentMatchers.any());
    }

    @Test
    public void testValidatePositionsMissingLeader() {
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(5L, Optional.of(10), new Metadata.LeaderAndEpoch(Optional.of(Node.noNode()), Optional.of(5)));
        Mockito.when(this.subscriptionState.partitionsNeedingValidation(this.time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
        Mockito.when(this.subscriptionState.position((TopicPartition) ArgumentMatchers.any())).thenReturn(fetchPosition, new SubscriptionState.FetchPosition[]{fetchPosition});
        Mockito.when(this.apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
        this.requestManager.validatePositionsIfNeeded();
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(true);
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testValidatePositionsFailureWithUnrecoverableAuthException() {
        mockSuccessfulBuildRequestForValidatingPositions(new SubscriptionState.FetchPosition(5L, Optional.of(10), new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), Optional.of(5))), LEADER_1);
        this.requestManager.validatePositionsIfNeeded();
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) this.requestManager.poll(this.time.milliseconds()).unsentRequests.get(0);
        buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED)).onComplete();
        Assertions.assertTrue(unsentRequest.future().isDone());
        Assertions.assertFalse(unsentRequest.future().isCompletedExceptionally());
        Assertions.assertThrows(TopicAuthorizationException.class, () -> {
            this.requestManager.validatePositionsIfNeeded();
        });
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
    }

    @Test
    public void testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(5, Optional.of(10), new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), Optional.of(3)));
        Mockito.when(this.subscriptionState.partitionsNeedingValidation(this.time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
        Mockito.when(this.subscriptionState.position((TopicPartition) ArgumentMatchers.any())).thenReturn(fetchPosition, new SubscriptionState.FetchPosition[]{fetchPosition});
        Mockito.when(this.apiVersions.get(LEADER_1.idString())).thenReturn((Object) null);
        this.requestManager.validatePositionsIfNeeded();
        Assertions.assertEquals(0, this.requestManager.requestsToSend(), "Invalid request count");
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).completeValidation(TEST_PARTITION_1);
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).setNextAllowedRetry((Set) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        Mockito.when(this.apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
        Mockito.when(this.subscriptionState.partitionsNeedingValidation(this.time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
        Mockito.when(this.subscriptionState.position((TopicPartition) ArgumentMatchers.any())).thenReturn(fetchPosition, new SubscriptionState.FetchPosition[]{fetchPosition});
        this.requestManager.validatePositionsIfNeeded();
        Assertions.assertEquals(1, this.requestManager.requestsToSend(), "Invalid request count");
    }

    @Test
    public void testUpdatePositionsWithCommittedOffsets() {
        long milliseconds = this.time.milliseconds() + 500;
        TopicPartition topicPartition = new TopicPartition("topic1", 1);
        Set<TopicPartition> singleton = Collections.singleton(topicPartition);
        Metadata.LeaderAndEpoch testLeaderEpoch = testLeaderEpoch(LEADER_1, Optional.of(1));
        mockAssignedPartitionsMissingPositions(singleton, singleton, testLeaderEpoch);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.commitRequestManager.fetchOffsets(singleton, milliseconds)).thenReturn(completableFuture);
        CompletableFuture updateFetchPositions = this.requestManager.updateFetchPositions(this.time.milliseconds());
        Assertions.assertFalse(updateFetchPositions.isDone(), "Update positions should wait for the OffsetFetch request");
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).fetchOffsets(singleton, milliseconds);
        Mockito.when(this.subscriptionState.initializingPartitions()).thenReturn(singleton);
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10L, Optional.of(1), "");
        completableFuture.complete(Collections.singletonMap(topicPartition, offsetAndMetadata));
        Assertions.assertTrue(updateFetchPositions.isDone(), "Update positions should complete after the OffsetFetch response");
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).seekUnvalidated(topicPartition, new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), testLeaderEpoch));
    }

    @Test
    public void testUpdatePositionsWithCommittedOffsetsReusesRequest() {
        long milliseconds = this.time.milliseconds() + 500;
        TopicPartition topicPartition = new TopicPartition("topic1", 1);
        Set<TopicPartition> singleton = Collections.singleton(topicPartition);
        Metadata.LeaderAndEpoch testLeaderEpoch = testLeaderEpoch(LEADER_1, Optional.of(1));
        mockAssignedPartitionsMissingPositions(singleton, singleton, testLeaderEpoch);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.commitRequestManager.fetchOffsets(singleton, milliseconds)).thenReturn(completableFuture);
        CompletableFuture updateFetchPositions = this.requestManager.updateFetchPositions(this.time.milliseconds());
        Assertions.assertFalse(updateFetchPositions.isDone(), "Update positions should wait for the OffsetFetch request");
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).fetchOffsets(singleton, milliseconds);
        Mockito.clearInvocations(new CommitRequestManager[]{this.commitRequestManager});
        CompletableFuture updateFetchPositions2 = this.requestManager.updateFetchPositions(this.time.milliseconds());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager, Mockito.never())).fetchOffsets(singleton, milliseconds);
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10L, Optional.of(1), "");
        completableFuture.complete(Collections.singletonMap(topicPartition, offsetAndMetadata));
        Assertions.assertTrue(updateFetchPositions.isDone());
        Assertions.assertTrue(updateFetchPositions2.isDone());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).seekUnvalidated(topicPartition, new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), testLeaderEpoch));
    }

    @Test
    public void testUpdatePositionsDoesNotApplyOffsetsIfPartitionNotInitializingAnymore() {
        long milliseconds = this.time.milliseconds() + 500;
        TopicPartition topicPartition = new TopicPartition("topic1", 1);
        Set<TopicPartition> singleton = Collections.singleton(topicPartition);
        mockAssignedPartitionsMissingPositions(singleton, singleton, testLeaderEpoch(LEADER_1, Optional.of(1)));
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.commitRequestManager.fetchOffsets(singleton, milliseconds)).thenReturn(completableFuture);
        Assertions.assertFalse(this.requestManager.updateFetchPositions(this.time.milliseconds()).isDone());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).fetchOffsets(singleton, milliseconds);
        Mockito.clearInvocations(new CommitRequestManager[]{this.commitRequestManager});
        Mockito.when(this.subscriptionState.initializingPartitions()).thenReturn(Collections.emptySet());
        completableFuture.complete(Collections.singletonMap(topicPartition, new OffsetAndMetadata(5L)));
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).seekUnvalidated((TopicPartition) ArgumentMatchers.any(), (SubscriptionState.FetchPosition) ArgumentMatchers.any());
    }

    @Test
    public void testUpdatePositionsDoesNotResetPositionBeforeRetrievingOffsetsForNewlyAddedPartition() {
        long milliseconds = this.time.milliseconds() + 500;
        TopicPartition topicPartition = new TopicPartition("topic1", 1);
        Set<TopicPartition> singleton = Collections.singleton(topicPartition);
        Metadata.LeaderAndEpoch testLeaderEpoch = testLeaderEpoch(LEADER_1, Optional.of(1));
        mockAssignedPartitionsMissingPositions(singleton, singleton, testLeaderEpoch);
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.commitRequestManager.fetchOffsets(singleton, milliseconds)).thenReturn(completableFuture);
        Assertions.assertFalse(this.requestManager.updateFetchPositions(this.time.milliseconds()).isDone());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).fetchOffsets(singleton, milliseconds);
        Mockito.clearInvocations(new CommitRequestManager[]{this.commitRequestManager});
        TopicPartition topicPartition2 = new TopicPartition("topic2", 2);
        HashSet hashSet = new HashSet(Arrays.asList(topicPartition, topicPartition2));
        mockAssignedPartitionsMissingPositions(hashSet, hashSet, testLeaderEpoch);
        Mockito.when(this.subscriptionState.initializingPartitions()).thenReturn(hashSet);
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10L, Optional.empty(), "");
        completableFuture.complete(Collections.singletonMap(topicPartition, offsetAndMetadata));
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).seekUnvalidated(topicPartition, new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), testLeaderEpoch));
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).resetInitializingPositions((Predicate) ArgumentMatchers.argThat(predicate -> {
            return !predicate.test(topicPartition2);
        }));
    }

    @Test
    public void testRemoteListOffsetsRequestTimeoutMs() {
        this.requestManager = new OffsetsRequestManager(this.subscriptionState, this.metadata, DEFAULT_ISOLATION_LEVEL, this.time, 500L, 100, 500, this.apiVersions, (NetworkClientDelegate) Mockito.mock(NetworkClientDelegate.class), this.commitRequestManager, new LogContext());
        Map singletonMap = Collections.singletonMap(TEST_PARTITION_1, -2L);
        mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
        this.requestManager.fetchOffsets(singletonMap, false);
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        ListOffsetsRequest build = ((NetworkClientDelegate.UnsentRequest) this.requestManager.poll(this.time.milliseconds()).unsentRequests.get(0)).requestBuilder().build();
        Assertions.assertInstanceOf(ListOffsetsRequest.class, build);
        Assertions.assertEquals(100, build.timeoutMs());
    }

    private void mockAssignedPartitionsMissingPositions(Set<TopicPartition> set, Set<TopicPartition> set2, Metadata.LeaderAndEpoch leaderAndEpoch) {
        Mockito.when(this.subscriptionState.partitionsNeedingValidation(ArgumentMatchers.anyLong())).thenReturn(Collections.emptySet());
        set.forEach(topicPartition -> {
            Mockito.when(Boolean.valueOf(this.subscriptionState.isAssigned(topicPartition))).thenReturn(true);
            Mockito.when(this.metadata.currentLeader(topicPartition)).thenReturn(leaderAndEpoch);
        });
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAllFetchPositions())).thenReturn(false);
        Mockito.when(this.subscriptionState.initializingPartitions()).thenReturn(set2);
    }

    private void mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition fetchPosition, Node node) {
        Mockito.when(this.subscriptionState.partitionsNeedingValidation(this.time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
        Mockito.when(this.subscriptionState.position((TopicPartition) ArgumentMatchers.any())).thenReturn(fetchPosition, new SubscriptionState.FetchPosition[]{fetchPosition});
        Mockito.when(this.apiVersions.get(node.idString())).thenReturn(NodeApiVersions.create());
    }

    private void testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch leaderAndEpoch) {
        TopicPartition topicPartition = TEST_PARTITION_1;
        Node node = LEADER_1;
        AutoOffsetResetStrategy autoOffsetResetStrategy = AutoOffsetResetStrategy.EARLIEST;
        Mockito.when(this.subscriptionState.partitionsNeedingReset(this.time.milliseconds())).thenReturn(Collections.singleton(topicPartition));
        Mockito.when(this.subscriptionState.resetStrategy((TopicPartition) ArgumentMatchers.any())).thenReturn(autoOffsetResetStrategy);
        mockSuccessfulRequest(Collections.singletonMap(topicPartition, node));
        this.requestManager.resetPositionsIfNeeded();
        Assertions.assertEquals(1, this.requestManager.requestsToSend());
        Mockito.when(this.metadata.currentLeader(topicPartition)).thenReturn(testLeaderEpoch(node, leaderAndEpoch.epoch));
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) this.requestManager.poll(this.time.milliseconds()).unsentRequests.get(0);
        buildClientResponse(unsentRequest, Collections.singletonMap(topicPartition, new OffsetAndTimestampInternal(5L, 1L, leaderAndEpoch.epoch))).onComplete();
        Assertions.assertTrue(unsentRequest.future().isDone());
        Assertions.assertFalse(unsentRequest.future().isCompletedExceptionally());
    }

    private ListOffsetsResponseData.ListOffsetsTopicResponse mockUnknownOffsetResponse(TopicPartition topicPartition) {
        return new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(topicPartition.topic()).setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(topicPartition.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOffset(-1L)));
    }

    private static Stream<Arguments> retriableErrors() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.NOT_LEADER_OR_FOLLOWER}), Arguments.of(new Object[]{Errors.REPLICA_NOT_AVAILABLE}), Arguments.of(new Object[]{Errors.KAFKA_STORAGE_ERROR}), Arguments.of(new Object[]{Errors.OFFSET_NOT_AVAILABLE}), Arguments.of(new Object[]{Errors.LEADER_NOT_AVAILABLE}), Arguments.of(new Object[]{Errors.FENCED_LEADER_EPOCH}), Arguments.of(new Object[]{Errors.BROKER_NOT_AVAILABLE}), Arguments.of(new Object[]{Errors.INVALID_REQUEST}), Arguments.of(new Object[]{Errors.UNKNOWN_LEADER_EPOCH}), Arguments.of(new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION})});
    }

    private void verifySuccessfulPollAndResponseReceived(CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> completableFuture, Map<TopicPartition, OffsetAndTimestampInternal> map) throws ExecutionException, InterruptedException {
        NetworkClientDelegate.PollResult poll = this.requestManager.poll(this.time.milliseconds());
        verifySuccessfulPollAwaitingResponse(poll);
        buildClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), map).onComplete();
        verifyRequestSuccessfullyCompleted(completableFuture, map);
    }

    private void verifyRequestCompletedWithErrorResponse(CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> completableFuture, Class<? extends Throwable> cls) {
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertTrue(completableFuture.isCompletedExceptionally());
        Objects.requireNonNull(completableFuture);
        Assertions.assertEquals(cls, Assertions.assertThrows(ExecutionException.class, completableFuture::get).getCause().getClass());
    }

    private void mockSuccessfulRequest(Map<TopicPartition, Node> map) {
        map.forEach((topicPartition, node) -> {
            Mockito.when(this.metadata.currentLeader(topicPartition)).thenReturn(testLeaderEpoch(node, Metadata.LeaderAndEpoch.noLeaderOrEpoch().epoch));
            Mockito.when(Boolean.valueOf(this.subscriptionState.isAssigned(topicPartition))).thenReturn(true);
        });
        Mockito.when(this.metadata.fetch()).thenReturn(testClusterMetadata(map));
    }

    private void mockFailedRequest_MissingLeader() {
        Mockito.when(this.metadata.currentLeader((TopicPartition) ArgumentMatchers.any(TopicPartition.class))).thenReturn(new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1)));
        Mockito.when(Boolean.valueOf(this.subscriptionState.isAssigned((TopicPartition) ArgumentMatchers.any(TopicPartition.class)))).thenReturn(true);
    }

    private void verifySuccessfulPollAwaitingResponse(NetworkClientDelegate.PollResult pollResult) {
        verifySuccessfulPollAwaitingResponse(pollResult, 1);
    }

    private void verifySuccessfulPollAwaitingResponse(NetworkClientDelegate.PollResult pollResult, int i) {
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertEquals(i, pollResult.unsentRequests.size());
    }

    private void verifyRequestSuccessfullyCompleted(CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> completableFuture, Map<TopicPartition, OffsetAndTimestampInternal> map) throws ExecutionException, InterruptedException {
        Assertions.assertEquals(0, this.requestManager.requestsToRetry());
        Assertions.assertEquals(0, this.requestManager.requestsToSend());
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertFalse(completableFuture.isCompletedExceptionally());
        Assertions.assertEquals(map, completableFuture.get());
        verifySubscriptionStateUpdated((Map) map.entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return Long.valueOf(((OffsetAndTimestampInternal) entry2.getValue()).offset());
        })));
    }

    private void verifySubscriptionStateUpdated(Map<TopicPartition, Long> map) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(TopicPartition.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.class);
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.times(map.size()))).updateLastStableOffset((TopicPartition) forClass.capture(), ((Long) forClass2.capture()).longValue());
        List allValues = forClass.getAllValues();
        List allValues2 = forClass2.getAllValues();
        Assertions.assertEquals(map.keySet().size(), allValues2.size());
        Assertions.assertEquals(map.keySet(), new HashSet(allValues));
        Assertions.assertEquals(map.values().size(), allValues2.size());
        Stream<Long> stream = map.values().stream();
        Objects.requireNonNull(allValues2);
        stream.map((v1) -> {
            return r1.contains(v1);
        }).forEach((v0) -> {
            Assertions.assertTrue(v0);
        });
    }

    private Metadata.LeaderAndEpoch testLeaderEpoch(Node node, Optional<Integer> optional) {
        return new Metadata.LeaderAndEpoch(Optional.of(node), optional);
    }

    private Cluster testClusterMetadata(Map<TopicPartition, Node> map) {
        return new Cluster("clusterId", map.values(), (List) map.keySet().stream().map(topicPartition -> {
            return new PartitionInfo(topicPartition.topic(), topicPartition.partition(), (Node) map.get(topicPartition), (Node[]) null, (Node[]) null);
        }).collect(Collectors.toList()), Collections.emptySet(), Collections.emptySet());
    }

    private ClientResponse buildClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Map<TopicPartition, OffsetAndTimestampInternal> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((topicPartition, offsetAndTimestampInternal) -> {
            arrayList.add(ListOffsetsResponse.singletonListOffsetsTopicResponse(topicPartition, Errors.NONE, offsetAndTimestampInternal.timestamp(), offsetAndTimestampInternal.offset(), ((Integer) offsetAndTimestampInternal.leaderEpoch().orElse(-1)).intValue()));
        });
        return buildClientResponse(unsentRequest, arrayList, false, null);
    }

    private ClientResponse buildOffsetsForLeaderEpochResponse(NetworkClientDelegate.UnsentRequest unsentRequest, List<TopicPartition> list, int i) {
        OffsetsForLeaderEpochRequest build = unsentRequest.requestBuilder().build();
        Assertions.assertInstanceOf(OffsetsForLeaderEpochRequest.class, build);
        OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest = build;
        OffsetForLeaderEpochResponseData offsetForLeaderEpochResponseData = new OffsetForLeaderEpochResponseData();
        list.forEach(topicPartition -> {
            ImplicitLinkedHashCollection.Element find = offsetForLeaderEpochResponseData.topics().find(topicPartition.topic());
            if (find == null) {
                find = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(topicPartition.topic());
                offsetForLeaderEpochResponseData.topics().add(find);
            }
            find.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(3).setEndOffset(i));
        });
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, offsetsForLeaderEpochRequest.version(), "", 1), unsentRequest.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, new OffsetsForLeaderEpochResponse(offsetForLeaderEpochResponseData));
    }

    private ClientResponse buildOffsetsForLeaderEpochResponseWithErrors(NetworkClientDelegate.UnsentRequest unsentRequest, Map<TopicPartition, Errors> map) {
        OffsetsForLeaderEpochRequest build = unsentRequest.requestBuilder().build();
        Assertions.assertInstanceOf(OffsetsForLeaderEpochRequest.class, build);
        OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest = build;
        OffsetForLeaderEpochResponseData offsetForLeaderEpochResponseData = new OffsetForLeaderEpochResponseData();
        map.keySet().forEach(topicPartition -> {
            ImplicitLinkedHashCollection.Element find = offsetForLeaderEpochResponseData.topics().find(topicPartition.topic());
            if (find == null) {
                find = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(topicPartition.topic());
                offsetForLeaderEpochResponseData.topics().add(find);
            }
            find.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(((Errors) map.get(topicPartition)).code()));
        });
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, offsetsForLeaderEpochRequest.version(), "", 1), unsentRequest.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, new OffsetsForLeaderEpochResponse(offsetForLeaderEpochResponseData));
    }

    private ClientResponse buildClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, List<ListOffsetsResponseData.ListOffsetsTopicResponse> list) {
        return buildClientResponse(unsentRequest, list, false, null);
    }

    private ClientResponse buildClientResponseWithErrors(NetworkClientDelegate.UnsentRequest unsentRequest, Map<TopicPartition, Errors> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((topicPartition, errors) -> {
            arrayList.add(ListOffsetsResponse.singletonListOffsetsTopicResponse(topicPartition, errors, -1L, -1L, -1));
        });
        return buildClientResponse(unsentRequest, arrayList, false, null);
    }

    private ClientResponse buildClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, List<ListOffsetsResponseData.ListOffsetsTopicResponse> list, boolean z, AuthenticationException authenticationException) {
        ListOffsetsRequest build = unsentRequest.requestBuilder().build();
        Assertions.assertInstanceOf(ListOffsetsRequest.class, build);
        ListOffsetsRequest listOffsetsRequest = build;
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FETCH, listOffsetsRequest.version(), "", 1), unsentRequest.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), z, (UnsupportedVersionException) null, authenticationException, new ListOffsetsResponse(new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(list)));
    }
}
