package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ByteBufferOutputStream;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/FetcherTest.class */
public class FetcherTest {
    private static final double EPSILON = 1.0E-4d;
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp = new TopicPartition(this.topicName, 0);
    private TopicPartition tp1 = new TopicPartition(this.topicName, 1);
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100;
    private MockTime time = new MockTime(1);
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
    private MockClient client = new MockClient(this.time, this.metadata);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 2);
    private Node node = (Node) this.cluster.nodes().get(0);
    private Metrics metrics = new Metrics(this.time);
    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(this.client, this.metadata, this.time, 100, 1000);
    private Fetcher<byte[], byte[]> fetcher = createFetcher(this.subscriptions, this.metrics);
    private Metrics fetcherMetrics = new Metrics(this.time);
    private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(this.subscriptionsNoAutoReset, this.fetcherMetrics);

    @Before
    public void setup() throws Exception {
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.setNode(this.node);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        builder.append(0L, "key".getBytes(), "value-2".getBytes());
        builder.append(0L, "key".getBytes(), "value-3".getBytes());
        this.records = builder.build();
        MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
        builder2.append(0L, "key".getBytes(), "value-4".getBytes());
        builder2.append(0L, "key".getBytes(), "value-5".getBytes());
        this.nextRecords = builder2.build();
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.fetcherMetrics.close();
    }

    @Test
    public void testFetchNormal() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp));
        List list = (List) fetchedRecords.get(this.tp);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp).longValue());
        long j = 1;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(j, ((ConsumerRecord) it.next()).offset());
            j++;
        }
    }

    @Test
    public void testFetchError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertFalse(this.fetcher.fetchedRecords().containsKey(this.tp));
    }

    private MockClient.RequestMatcher matchesOffset(final TopicPartition topicPartition, final long j) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.1
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                FetchRequest fetchRequest = (FetchRequest) abstractRequest;
                return fetchRequest.fetchData().containsKey(topicPartition) && ((FetchRequest.PartitionData) fetchRequest.fetchData().get(topicPartition)).offset == j;
            }
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.2
            int i = 0;

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public byte[] m2deserialize(String str, byte[] bArr) {
                int i = this.i;
                this.i = i + 1;
                if (i == 1) {
                    throw new SerializationException();
                }
                return bArr;
            }
        };
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(this.time), byteArrayDeserializer, byteArrayDeserializer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 1L);
        this.client.prepareResponse(matchesOffset(this.tp, 1L), (AbstractResponse) fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        try {
            createFetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have raised");
        } catch (SerializationException e) {
            Assert.assertEquals(1L, this.subscriptions.position(this.tp).longValue());
        }
    }

    @Test
    public void testParseInvalidRecord() throws Exception {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(allocate));
        byte[] bytes = "foo".getBytes();
        byte[] bytes2 = "baz".getBytes();
        int recordSize = Record.recordSize(bytes, bytes2);
        long computeChecksum = Record.computeChecksum((byte) 1, Record.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0L);
        dataOutputStream.writeInt(recordSize);
        Record.write(dataOutputStream, (byte) 1, computeChecksum, Record.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0L);
        dataOutputStream.writeInt(recordSize);
        Record.write(dataOutputStream, (byte) 1, computeChecksum + 1, Record.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        allocate.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(allocate), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have raised");
        } catch (KafkaException e) {
            Assert.assertEquals(0L, this.subscriptions.position(this.tp).longValue());
        }
    }

    @Test
    public void testFetchMaxPollRecords() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptions, new Metrics(this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 1L);
        this.client.prepareResponse(matchesOffset(this.tp, 1L), (AbstractResponse) fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.client.prepareResponse(matchesOffset(this.tp, 4L), (AbstractResponse) fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        List list = (List) createFetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp).longValue());
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(0L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        List list2 = (List) createFetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp).longValue());
        Assert.assertEquals(3L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertTrue(createFetcher.sendFetches() > 0);
        this.consumerClient.poll(0L);
        List list3 = (List) createFetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals(2L, list3.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp).longValue());
        Assert.assertEquals(4L, ((ConsumerRecord) list3.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list3.get(1)).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords build = builder.build();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(build, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        List list = (List) this.fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(31L, this.subscriptions.position(this.tp).longValue());
        Assert.assertEquals(15L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(20L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(30L, ((ConsumerRecord) list.get(2)).offset());
    }

    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            this.client.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH.id, (short) 2, (short) 2))));
            makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("RecordTooLargeException should have been raised");
            } catch (RecordTooLargeException e) {
                Assert.assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assert.assertEquals(0L, this.subscriptions.position(this.tp).longValue());
            }
        } finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("RecordTooLargeException should have been raised");
        } catch (KafkaException e) {
            Assert.assertTrue(e.getMessage().startsWith("Failed to make progress reading messages"));
            Assert.assertEquals(0L, this.subscriptions.position(this.tp).longValue());
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0})), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have thrown");
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringRebalance() {
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp));
        this.client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp);
        this.client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.subscriptions.pause(this.tp);
        Assert.assertFalse(this.fetcher.sendFetches() > 0);
        Assert.assertTrue(this.client.requests().isEmpty());
    }

    @Test
    public void testFetchNotLeaderForPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertEquals((Object) null, this.subscriptions.position(this.tp));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.subscriptions.seek(this.tp, 1L);
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertEquals(1L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 0L);
        Assert.assertTrue(this.fetcherNoAutoReset.sendFetches() > 0);
        this.client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse(this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 2L);
        Assert.assertEquals(0L, this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 0L);
        this.fetcherNoAutoReset.sendFetches();
        this.client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse(this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp));
        try {
            this.fetcherNoAutoReset.fetchedRecords();
            Assert.fail("Should have thrown OffsetOutOfRangeException");
        } catch (OffsetOutOfRangeException e) {
            Assert.assertTrue(e.offsetOutOfRangePartitions().containsKey(this.tp));
            Assert.assertEquals(e.offsetOutOfRangePartitions().size(), 1L);
        }
        Assert.assertEquals(0L, this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse) fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true);
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals(0L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionToCommitted() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(0L));
        this.subscriptions.pause(this.tp);
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.pause(this.tp);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 0L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals(0L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(0L));
        this.subscriptions.pause(this.tp);
        this.subscriptions.seek(this.tp, 10L);
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(0L));
        this.subscriptions.seek(this.tp, 10L);
        this.subscriptions.pause(this.tp);
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp).longValue());
    }

    @Test
    public void testGetAllTopics() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertEquals(this.cluster.topics().size(), this.fetcher.getAllTopicMetadata(5000L).size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.client.prepareResponse((AbstractResponse) null, true);
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertEquals(this.cluster.topics().size(), this.fetcher.getAllTopicMetadata(5000L).size());
    }

    @Test(expected = TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        this.fetcher.getAllTopicMetadata(50L);
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(10L);
            Assert.fail();
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), e.unauthorizedTopics());
        }
    }

    @Test(expected = InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName)), 5000L);
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Assert.assertNull(this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName)), 5000L).get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertTrue(this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName)), 5000L).containsKey(this.topicName));
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        for (int i = 1; i < 4; i++) {
            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
            for (int i2 = 0; i2 < 3; i2++) {
                builder.appendWithOffset((i * 3) + i2, -1L, "key".getBytes(), String.format("value-%d", Integer.valueOf(i2)).getBytes());
            }
            Assert.assertEquals(3L, fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(this.tp).size());
        }
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricName("fetch-throttle-time-avg", this.metricGroup, ""));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricName("fetch-throttle-time-max", this.metricGroup, ""));
        Assert.assertEquals(200.0d, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(300.0d, kafkaMetric2.value(), EPSILON);
    }

    @Test
    public void testFetcherMetrics() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        MetricName metricName = this.metrics.metricName("records-lag-max", this.metricGroup, "");
        MetricName metricName2 = this.metrics.metricName(this.tp + ".records-lag", this.metricGroup, "");
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(metricName);
        Assert.assertEquals(Double.NEGATIVE_INFINITY, kafkaMetric.value(), EPSILON);
        fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0);
        Assert.assertEquals(100.0d, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(100.0d, ((KafkaMetric) metrics.get(metricName2)).value(), EPSILON);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), String.format("value-%d", Integer.valueOf(i)).getBytes());
        }
        fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0);
        Assert.assertEquals(197.0d, kafkaMetric.value(), EPSILON);
        this.subscriptions.unsubscribe();
        Assert.assertFalse(metrics.containsKey(metricName2));
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords memoryRecords, short s, long j, int i) {
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(memoryRecords, s, j, i));
        this.consumerClient.poll(0L);
        return this.fetcher.fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        try {
            this.fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), 100L);
            Assert.fail("Should throw timeout exception.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testGetOffsetsForTimes() {
        Assert.assertTrue(this.fetcher.getOffsetsByTimes(new HashMap(), 100L).isEmpty());
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    private void testGetOffsetsForTimesWithError(Errors errors, Errors errors2, long j, long j2, Long l, Long l2) {
        this.client.reset();
        TopicPartition topicPartition = this.tp;
        TopicPartition topicPartition2 = new TopicPartition(this.topicName, 1);
        Cluster clusterWith = TestUtils.clusterWith(2, this.topicName, 2);
        this.metadata.update(clusterWith, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, errors, j, j), clusterWith.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition2, errors2, j2, j2), clusterWith.leaderFor(topicPartition2));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, Errors.NONE, j, j), clusterWith.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition2, Errors.NONE, j2, j2), clusterWith.leaderFor(topicPartition2));
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 0L);
        hashMap.put(topicPartition2, 0L);
        Map offsetsByTimes = this.fetcher.getOffsetsByTimes(hashMap, Long.MAX_VALUE);
        if (l == null) {
            Assert.assertNull(offsetsByTimes.get(topicPartition));
        } else {
            Assert.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(topicPartition)).timestamp());
            Assert.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(topicPartition)).offset());
        }
        if (l2 == null) {
            Assert.assertNull(offsetsByTimes.get(topicPartition2));
        } else {
            Assert.assertEquals(l2.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(topicPartition2)).timestamp());
            Assert.assertEquals(l2.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(topicPartition2)).offset());
        }
    }

    @Test
    public void testFetchPositionAfterException() {
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp, this.tp1}));
        this.subscriptionsNoAutoReset.seek(this.tp, 1L);
        this.subscriptionsNoAutoReset.seek(this.tp1, 1L);
        Assert.assertEquals(1L, this.fetcherNoAutoReset.sendFetches());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code(), 100L, MemoryRecords.EMPTY));
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE.code(), 100L, this.records));
        this.client.prepareResponse(new FetchResponse(new LinkedHashMap(hashMap), 0));
        this.consumerClient.poll(0L);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            Iterator it = this.fetcherNoAutoReset.fetchedRecords().values().iterator();
            while (it.hasNext()) {
                arrayList.addAll((List) it.next());
            }
        } catch (OffsetOutOfRangeException e) {
            arrayList2.add(e);
        }
        Assert.assertEquals(arrayList.size(), this.subscriptionsNoAutoReset.position(this.tp1).longValue() - 1);
        try {
            Iterator it2 = this.fetcherNoAutoReset.fetchedRecords().values().iterator();
            while (it2.hasNext()) {
                arrayList.addAll((List) it2.next());
            }
        } catch (OffsetOutOfRangeException e2) {
            arrayList2.add(e2);
        }
        Assert.assertEquals(4L, this.subscriptionsNoAutoReset.position(this.tp1).longValue());
        Assert.assertEquals(3L, arrayList.size());
        Assert.assertEquals(1L, arrayList2.size());
        Assert.assertTrue(((OffsetOutOfRangeException) arrayList2.get(0)).offsetOutOfRangePartitions().containsKey(this.tp));
        Assert.assertEquals(r0.offsetOutOfRangePartitions().size(), 1L);
    }

    @Test
    public void testSeekBeforeException() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptionsNoAutoReset, new Metrics(this.time), 2);
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp}));
        this.subscriptionsNoAutoReset.seek(this.tp, 1L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        new HashMap().put(this.tp, new FetchResponse.PartitionData(Errors.NONE.code(), 100L, this.records));
        this.client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(2L, ((List) createFetcher.fetchedRecords().get(this.tp)).size());
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp, this.tp1}));
        this.subscriptionsNoAutoReset.seek(this.tp1, 1L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code(), 100L, MemoryRecords.EMPTY));
        this.client.prepareResponse(new FetchResponse(new LinkedHashMap(hashMap), 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(1L, ((List) createFetcher.fetchedRecords().get(this.tp)).size());
        this.subscriptionsNoAutoReset.seek(this.tp1, 10L);
        Assert.assertEquals(0L, createFetcher.fetchedRecords().size());
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(final long j) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.3
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return j == ((Long) ((ListOffsetRequest) abstractRequest).partitionTimestamps().get(FetcherTest.this.tp)).longValue();
            }
        };
    }

    private ListOffsetResponse listOffsetResponse(Errors errors, long j, long j2) {
        return listOffsetResponse(this.tp, errors, j, j2);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition topicPartition, Errors errors, long j, long j2) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(errors.code(), j, j2);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, partitionData);
        return new ListOffsetResponse(hashMap, 1);
    }

    private FetchResponse fetchResponse(MemoryRecords memoryRecords, short s, long j, int i) {
        return new FetchResponse(new LinkedHashMap(Collections.singletonMap(this.tp, new FetchResponse.PartitionData(s, j, memoryRecords))), i);
    }

    private MetadataResponse newMetadataResponse(String str, Errors errors) {
        ArrayList arrayList = new ArrayList();
        if (errors == Errors.NONE) {
            for (PartitionInfo partitionInfo : this.cluster.partitionsForTopic(str)) {
                arrayList.add(new MetadataResponse.PartitionMetadata(Errors.NONE, partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas())));
            }
        }
        return new MetadataResponse(this.cluster.nodes(), (String) null, -1, Arrays.asList(new MetadataResponse.TopicMetadata(errors, str, false, arrayList)));
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptionState, Metrics metrics, int i) {
        return createFetcher(subscriptionState, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), i);
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptionState, Metrics metrics) {
        return createFetcher(subscriptionState, metrics, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptionState, Metrics metrics, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return createFetcher(subscriptionState, metrics, deserializer, deserializer2, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptionState, Metrics metrics, Deserializer<K> deserializer, Deserializer<V> deserializer2, int i) {
        return new Fetcher<>(this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, i, true, deserializer, deserializer2, this.metadata, subscriptionState, metrics, "consumer" + this.groupId, this.time, this.retryBackoffMs);
    }
}
