package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.class */
public class AsyncKafkaConsumerTest {
    private AsyncKafkaConsumer<String, String> consumer = null;
    private Time time = new MockTime(0);
    private final FetchCollector<String, String> fetchCollector = (FetchCollector) Mockito.mock(FetchCollector.class);
    private final ApplicationEventHandler applicationEventHandler = (ApplicationEventHandler) Mockito.mock(ApplicationEventHandler.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata) Mockito.mock(ConsumerMetadata.class);
    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
    private final CompletableEventReaper backgroundEventReaper = (CompletableEventReaper) Mockito.mock(CompletableEventReaper.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest$MockCommitCallback.class */
    public static class MockCommitCallback implements OffsetCommitCallback {
        public int invoked = 0;
        public Exception exception = null;
        public String completionThread;

        private MockCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            this.invoked++;
            this.completionThread = Thread.currentThread().getName();
            this.exception = exc;
        }
    }

    @AfterEach
    public void resetAll() {
        this.backgroundEventQueue.clear();
        if (this.consumer != null) {
            try {
                this.consumer.close(Duration.ZERO);
            } catch (Exception e) {
            }
        }
        this.consumer = null;
        Mockito.framework().clearInlineMocks();
        MockConsumerInterceptor.resetCounters();
    }

    private AsyncKafkaConsumer<String, String> newConsumer() {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("group.id", "group-id");
        return newConsumer(requiredConsumerConfig);
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
        return newConsumer(TestUtils.requiredConsumerConfig());
    }

    private AsyncKafkaConsumer<String, String> newConsumer(Properties properties) {
        if (!properties.containsKey("enable.auto.commit")) {
            properties.put("enable.auto.commit", false);
        }
        return new AsyncKafkaConsumer<>(new ConsumerConfig(properties), new StringDeserializer(), new StringDeserializer(), this.time, (logContext, time, blockingQueue, completableEventReaper, supplier, supplier2, supplier3, asyncConsumerMetrics) -> {
            return this.applicationEventHandler;
        }, logContext2 -> {
            return this.backgroundEventReaper;
        }, (logContext3, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time2) -> {
            return this.fetchCollector;
        }, (consumerConfig, subscriptionState2, logContext4, clusterResourceListeners) -> {
            return this.metadata;
        }, this.backgroundEventQueue);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig consumerConfig) {
        return new AsyncKafkaConsumer<>(consumerConfig, new StringDeserializer(), new StringDeserializer(), this.time, (logContext, time, blockingQueue, completableEventReaper, supplier, supplier2, supplier3, asyncConsumerMetrics) -> {
            return this.applicationEventHandler;
        }, logContext2 -> {
            return this.backgroundEventReaper;
        }, (logContext3, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time2) -> {
            return this.fetchCollector;
        }, (consumerConfig2, subscriptionState2, logContext4, clusterResourceListeners) -> {
            return this.metadata;
        }, this.backgroundEventQueue);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(FetchBuffer fetchBuffer, ConsumerInterceptors<String, String> consumerInterceptors, ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker, SubscriptionState subscriptionState, String str, String str2, boolean z) {
        return new AsyncKafkaConsumer<>(new LogContext(), str2, new Deserializers(new StringDeserializer(), new StringDeserializer()), fetchBuffer, this.fetchCollector, consumerInterceptors, this.time, this.applicationEventHandler, this.backgroundEventQueue, this.backgroundEventReaper, consumerRebalanceListenerInvoker, new Metrics(), subscriptionState, this.metadata, 100L, 30000, 1000, str, z);
    }

    @Test
    public void testSuccessfulStartupShutdown() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.close();
        });
    }

    @Test
    public void testFailOnClosedConsumer() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        AsyncKafkaConsumer<String, String> asyncKafkaConsumer = this.consumer;
        Objects.requireNonNull(asyncKafkaConsumer);
        Assertions.assertEquals("This consumer has already been closed.", ((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, asyncKafkaConsumer::assignment)).getMessage());
    }

    @Test
    public void testCommitAsyncWithNullCallback() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L));
        markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync(hashMap, (OffsetCommitCallback) null);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(AsyncCommitEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) forClass.capture());
        AsyncCommitEvent asyncCommitEvent = (AsyncCommitEvent) forClass.getValue();
        Assertions.assertTrue(asyncCommitEvent.offsets().isPresent());
        Assertions.assertEquals(hashMap, asyncCommitEvent.offsets().get());
        asyncCommitEvent.future().complete(hashMap);
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, (OffsetCommitCallback) null);
        });
        try {
            Assertions.assertInstanceOf(TimeoutException.class, ((Exception) Assertions.assertThrows(KafkaException.class, () -> {
                this.consumer.close(Duration.ZERO);
            })).getCause());
            this.consumer = null;
        } catch (Throwable th) {
            this.consumer = null;
            throw th;
        }
    }

    @Test
    public void testCommitAsyncUserSuppliedCallbackNoException() {
        this.consumer = newConsumer();
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        completeCommitAsyncApplicationEventSuccessfully();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, mockCommitCallback);
        });
        forceCommitCallbackInvocation();
        Assertions.assertEquals(mockCommitCallback.invoked, 1);
        Assertions.assertNull(mockCommitCallback.exception);
    }

    @MethodSource({"commitExceptionSupplier"})
    @ParameterizedTest
    public void testCommitAsyncUserSuppliedCallbackWithException(Exception exc) {
        this.consumer = newConsumer();
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        completeCommitAsyncApplicationEventExceptionally(exc);
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, mockCommitCallback);
        });
        forceCommitCallbackInvocation();
        Assertions.assertSame(exc.getClass(), mockCommitCallback.exception.getClass());
    }

    private static Stream<Exception> commitExceptionSupplier() {
        return Stream.of((Object[]) new Exception[]{new KafkaException("Test exception"), new GroupAuthorizationException("Group authorization exception")});
    }

    @Test
    public void testCommitted() {
        this.time = new MockTime(1L);
        this.consumer = newConsumer();
        Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        completeFetchedCommittedOffsetApplicationEventSuccessfully(mockTopicPartitionOffset);
        Assertions.assertEquals(mockTopicPartitionOffset, this.consumer.committed(mockTopicPartitionOffset.keySet(), Duration.ofMillis(1000L)));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
        Assertions.assertTrue(((Double) ((Metric) this.consumer.metrics().get(this.consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"))).metricValue()).doubleValue() > 0.0d);
    }

    @Test
    public void testCommittedExceptionThrown() {
        this.consumer = newConsumer();
        Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        Mockito.when((Map) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsEvent.class))).thenAnswer(invocationOnMock -> {
            Assertions.assertInstanceOf(FetchCommittedOffsetsEvent.class, (CompletableApplicationEvent) invocationOnMock.getArgument(0));
            throw new KafkaException("Test exception");
        });
        Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.committed(mockTopicPartitionOffset.keySet(), Duration.ofMillis(1000L));
        });
    }

    @Test
    public void testWakeupBeforeCallingPoll() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        this.consumer.wakeup();
        markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testWakeupAfterEmptyFetch() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.wakeup();
            return Fetch.empty();
        }).doAnswer(invocationOnMock2 -> {
            return Fetch.empty();
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ofMinutes(1L));
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testWakeupAfterNonEmptyFetch() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        List asList = Arrays.asList(new ConsumerRecord("foo", 3, 2L, "key1", "value1"), new ConsumerRecord("foo", 3, 3L, "key2", "value2"));
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.wakeup();
            return Fetch.forPartition(topicPartition, asList, true, new OffsetAndMetadata(4L, Optional.of(0), ""));
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) Mockito.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ofMinutes(1L));
        });
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testCommitInRebalanceCallback() {
        this.consumer = newConsumer();
        final TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            return Fetch.empty();
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) Mockito.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        TreeSet treeSet = new TreeSet((Comparator) AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR);
        treeSet.add(topicPartition);
        this.backgroundEventQueue.add(new ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, treeSet));
        completeCommitSyncApplicationEventSuccessfully();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ConsumerRebalanceListener consumerRebalanceListener = new ConsumerRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                TopicPartition topicPartition2 = topicPartition;
                Assertions.assertDoesNotThrow(() -> {
                    AsyncKafkaConsumerTest.this.consumer.commitSync(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition2, new OffsetAndMetadata(0L))}));
                });
                atomicBoolean.set(true);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            }
        };
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("foo"), consumerRebalanceListener);
        markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testClearWakeupTriggerAfterPoll() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doReturn(Fetch.forPartition(topicPartition, Arrays.asList(new ConsumerRecord("foo", 3, 2L, "key1", "value1"), new ConsumerRecord("foo", 3, 3L, "key2", "value2")), true, new OffsetAndMetadata(4L, Optional.of(0), ""))).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testEnsureCallbackExecutedByApplicationThread() {
        this.consumer = newConsumer();
        String name = Thread.currentThread().getName();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        completeCommitAsyncApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), mockCommitCallback);
        });
        forceCommitCallbackInvocation();
        Assertions.assertEquals(1, mockCommitCallback.invoked);
        Assertions.assertEquals(name, mockCommitCallback.completionThread);
    }

    @Test
    public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() {
        this.consumer = newConsumer();
        KafkaException kafkaException = new KafkaException("Async commit callback failed");
        OffsetCommitCallback offsetCommitCallback = (map, exc) -> {
            throw kafkaException;
        };
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), offsetCommitCallback);
        });
        Assertions.assertThrows(kafkaException.getClass(), () -> {
            this.consumer.commitSync();
        });
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
        CompletableFuture<Void> upConsumerWithIncompleteAsyncCommit = setUpConsumerWithIncompleteAsyncCommit(new TopicPartition("foo", 0));
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100L));
        });
        upConsumerWithIncompleteAsyncCommit.completeExceptionally(new KafkaException("Test exception"));
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100L));
        });
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        CompletableFuture<Void> upConsumerWithIncompleteAsyncCommit = setUpConsumerWithIncompleteAsyncCommit(topicPartition);
        completeCommitSyncApplicationEventSuccessfully();
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(20L)), Duration.ofMillis(100L));
        });
        upConsumerWithIncompleteAsyncCommit.complete(null);
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(20L)), Duration.ofMillis(100L));
        });
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        CompletableFuture<Void> upConsumerWithIncompleteAsyncCommit = setUpConsumerWithIncompleteAsyncCommit(topicPartition);
        completeCommitSyncApplicationEventSuccessfully();
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(20L)), Duration.ofMillis(100L));
        });
        upConsumerWithIncompleteAsyncCommit.completeExceptionally(new KafkaException("Test exception"));
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(20L)), Duration.ofMillis(100L));
        });
    }

    private CompletableFuture<Void> setUpConsumerWithIncompleteAsyncCommit(TopicPartition topicPartition) {
        this.time = new MockTime(1L);
        this.consumer = newConsumer();
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(topicPartition, 20L);
        markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync();
        return getLastEnqueuedEvent().future();
    }

    private <T> CompletableApplicationEvent<T> getLastEnqueuedEvent() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(CompletableApplicationEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).add((ApplicationEvent) forClass.capture());
        List allValues = forClass.getAllValues();
        return (CompletableApplicationEvent) allValues.get(allValues.size() - 1);
    }

    private <T> CompletableApplicationEvent<T> addAndGetLastEnqueuedEvent() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(CompletableApplicationEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) forClass.capture());
        List allValues = forClass.getAllValues();
        return (CompletableApplicationEvent) allValues.get(allValues.size() - 1);
    }

    @Test
    public void testEnsurePollExecutedCommitAsyncCallbacks() {
        this.consumer = newConsumer();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        completeCommitAsyncApplicationEventSuccessfully();
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), mockCommitCallback);
        });
        markReconcileAndAutoCommitCompleteForPollEvent();
        assertMockCommitCallbackInvoked(() -> {
            this.consumer.poll(Duration.ZERO);
        }, mockCommitCallback, null);
    }

    @Test
    public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        completeCommitAsyncApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), mockCommitCallback);
        });
        assertMockCommitCallbackInvoked(() -> {
            this.consumer.close();
        }, mockCommitCallback, null);
    }

    @Test
    public void testVerifyApplicationEventOnShutdown() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        ((ApplicationEventHandler) Mockito.doReturn((Object) null).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any());
        this.consumer.close();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(CommitOnCloseEvent.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    @ValueSource(longs = {0, 30000})
    @ParameterizedTest
    public void testCloseLeavesGroup(long j) {
        this.consumer = (AsyncKafkaConsumer) Mockito.spy(newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), (SubscriptionState) Mockito.mock(SubscriptionState.class), "group-id", "client-id", false));
        this.consumer.close(Duration.ofMillis(j));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    @Test
    public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
        KafkaException kafkaException = new KafkaException("Intentional error");
        Set singleton = Collections.singleton(new TopicPartition("topic1", 0));
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        Mockito.when(subscriptionState.assignedPartitions()).thenReturn(singleton);
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class);
        ((ConsumerRebalanceListenerInvoker) Mockito.doAnswer(invocationOnMock -> {
            return kafkaException;
        }).when(consumerRebalanceListenerInvoker)).invokePartitionsLost((SortedSet) ArgumentMatchers.any(SortedSet.class));
        this.consumer = (AsyncKafkaConsumer) Mockito.spy(newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), consumerRebalanceListenerInvoker, subscriptionState, "group-id", "client-id", false));
        this.consumer.setGroupAssignmentSnapshot(singleton);
        Throwable assertThrows = Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.close(Duration.ZERO);
        });
        Assertions.assertNotNull(assertThrows.getCause());
        Assertions.assertEquals(kafkaException, assertThrows.getCause());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    @ValueSource(longs = {0, 30000})
    @ParameterizedTest
    public void testCloseLeavesGroupDespiteInterrupt(long j) {
        Set singleton = Collections.singleton(new TopicPartition("topic1", 0));
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        Mockito.when(subscriptionState.assignedPartitions()).thenReturn(singleton);
        Mockito.when(this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
        this.consumer = (AsyncKafkaConsumer) Mockito.spy(newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, "group-id", "client-id", false));
        Duration ofMillis = Duration.ofMillis(j);
        try {
            Assertions.assertThrows(InterruptException.class, () -> {
                this.consumer.close(ofMillis);
            });
            Thread.interrupted();
            ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(CommitOnCloseEvent.class));
            ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
        } catch (Throwable th) {
            Thread.interrupted();
            throw th;
        }
    }

    @Test
    public void testCommitSyncAllConsumed() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, "group-id", "client-id", false);
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        completeSeekUnvalidatedEventSuccessfully();
        subscriptionState.seek(new TopicPartition("topic", 0), 100L);
        markOffsetsReadyForCommitEvent();
        this.consumer.commitSyncAllConsumed(this.time.timer(100L));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SyncCommitEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) forClass.capture());
        Assertions.assertFalse(((SyncCommitEvent) forClass.getValue()).offsets().isPresent(), "Expected empty optional offsets");
    }

    @Test
    public void testAutoCommitSyncDisabled() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, "group-id", "client-id", false);
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        completeSeekUnvalidatedEventSuccessfully();
        subscriptionState.seek(new TopicPartition("topic", 0), 100L);
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).add((ApplicationEvent) ArgumentMatchers.any(SyncCommitEvent.class));
    }

    private void assertMockCommitCallbackInvoked(Executable executable, MockCommitCallback mockCommitCallback, Errors errors) {
        Assertions.assertDoesNotThrow(executable);
        Assertions.assertEquals(1, mockCommitCallback.invoked);
        if (errors == null) {
            Assertions.assertNull(mockCommitCallback.exception);
        } else if (errors.exception() instanceof RetriableException) {
            Assertions.assertInstanceOf(RetriableCommitFailedException.class, mockCommitCallback.exception);
        }
    }

    @Test
    public void testAssign() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().contains(topicPartition));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(AssignmentChangeEvent.class));
    }

    @Test
    public void testAssignOnNullTopicPartition() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign((Collection) null);
        });
    }

    @Test
    public void testAssignOnEmptyTopicPartition() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.assign(Collections.emptyList());
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
    }

    @Test
    public void testAssignOnNullTopicInPartition() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign(Collections.singleton(new TopicPartition((String) null, 0)));
        });
    }

    @Test
    public void testAssignOnEmptyTopicInPartition() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign(Collections.singleton(new TopicPartition("  ", 0)));
        });
    }

    @Test
    public void testBeginningOffsetsFailsIfNullPartitions() {
        this.consumer = newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.consumer.beginningOffsets((Collection) null, Duration.ofMillis(1L));
        });
    }

    @Test
    public void testBeginningOffsets() {
        this.consumer = newConsumer();
        Map<TopicPartition, OffsetAndTimestampInternal> mockOffsetAndTimestamp = mockOffsetAndTimestamp();
        Mockito.when((Map) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ListOffsetsEvent.class))).thenAnswer(invocationOnMock -> {
            if (this.time.timer(((ListOffsetsEvent) invocationOnMock.getArgument(0)).deadlineMs() - this.time.milliseconds()).remainingMs() == 0) {
                Assertions.fail("Timer duration should not be zero.");
            }
            return mockOffsetAndTimestamp;
        });
        Map map = (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.beginningOffsets(mockOffsetAndTimestamp.keySet(), Duration.ofMillis(1L));
        });
        mockOffsetAndTimestamp.forEach((topicPartition, offsetAndTimestampInternal) -> {
            Assertions.assertTrue(map.containsKey(topicPartition));
            Assertions.assertEquals(offsetAndTimestampInternal.offset(), (Long) map.get(topicPartition));
        });
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ListOffsetsEvent.class));
    }

    @Test
    public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailure() {
        this.consumer = newConsumer();
        Set<TopicPartition> keySet = mockTopicPartitionOffset().keySet();
        Throwable kafkaException = new KafkaException("Unexpected failure processing List Offsets event");
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{kafkaException}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ListOffsetsEvent.class));
        Assertions.assertEquals(kafkaException, Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.beginningOffsets(keySet, Duration.ofMillis(1L));
        }));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() {
        this.consumer = newConsumer();
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any());
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.beginningOffsets(Collections.singletonList(new TopicPartition("t1", 0)), Duration.ofMillis(1L));
        });
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesOnNullPartitions() {
        this.consumer = newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.consumer.offsetsForTimes((Map) null, Duration.ofMillis(1L));
        });
    }

    @Test
    public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -2L), Duration.ofMillis(1L));
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -1L), Duration.ofMillis(1L));
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -3L), Duration.ofMillis(1L));
        });
    }

    @Test
    public void testOffsetsForTimes() {
        this.consumer = newConsumer();
        Map<TopicPartition, OffsetAndTimestampInternal> mockOffsetAndTimestamp = mockOffsetAndTimestamp();
        Map<TopicPartition, Long> mockTimestampToSearch = mockTimestampToSearch();
        ((ApplicationEventHandler) Mockito.doReturn(mockOffsetAndTimestamp).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any());
        Map map = (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.offsetsForTimes(mockTimestampToSearch, Duration.ofMillis(1L));
        });
        mockOffsetAndTimestamp.forEach((topicPartition, offsetAndTimestampInternal) -> {
            Assertions.assertEquals(offsetAndTimestampInternal.buildOffsetAndTimestamp(), map.get(topicPartition));
        });
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesTimeoutException() {
        this.consumer = newConsumer();
        long j = 100;
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{new TimeoutException("Event did not complete in time and was expired by the reaper")}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any());
        Assertions.assertEquals("Failed to get offsets by times in " + 100 + "ms", Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.offsetsForTimes(mockTimestampToSearch(), Duration.ofMillis(j));
        }).getMessage());
    }

    @Test
    public void testBeginningOffsetsTimeoutException() {
        this.consumer = newConsumer();
        long j = 100;
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{new TimeoutException("Event did not complete in time and was expired by the reaper")}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any());
        Assertions.assertEquals("Failed to get offsets by times in " + 100 + "ms", Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.beginningOffsets(Collections.singleton(new TopicPartition("topic", 5)), Duration.ofMillis(j));
        }).getMessage());
    }

    @Test
    public void testEndOffsetsTimeoutException() {
        this.consumer = newConsumer();
        long j = 100;
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{new TimeoutException("Event did not complete in time and was expired by the reaper")}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any());
        Assertions.assertEquals("Failed to get offsets by times in " + 100 + "ms", Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.endOffsets(Collections.singleton(new TopicPartition("topic", 5)), Duration.ofMillis(j));
        }).getMessage());
    }

    @Test
    public void testBeginningOffsetsWithZeroTimeout() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        Map map = (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ZERO);
        });
        Assertions.assertTrue(map.containsKey(topicPartition));
        Assertions.assertNull(map.get(topicPartition));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesWithZeroTimeout() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        Map singletonMap = Collections.singletonMap(topicPartition, null);
        Map singletonMap2 = Collections.singletonMap(topicPartition, 5L);
        Assertions.assertEquals(singletonMap, (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.offsetsForTimes(singletonMap2, Duration.ZERO);
        }));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testWakeupCommitted() {
        this.consumer = newConsumer();
        Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            CompletableApplicationEvent completableApplicationEvent = (CompletableApplicationEvent) invocationOnMock.getArgument(0);
            Assertions.assertInstanceOf(FetchCommittedOffsetsEvent.class, completableApplicationEvent);
            Assertions.assertTrue(completableApplicationEvent.future().isCompletedExceptionally());
            return ConsumerUtils.getResult(completableApplicationEvent.future());
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.committed(mockTopicPartitionOffset.keySet());
        });
        Assertions.assertNull(this.consumer.wakeupTrigger().getPendingTask());
    }

    @Test
    public void testNoWakeupInCloseCommit() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("consumer-group");
        requiredConsumerConfigAndGroupId.put("enable.auto.commit", true);
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(topicPartition, 10L);
        this.consumer.wakeup();
        AtomicReference atomicReference = new AtomicReference();
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            SyncCommitEvent syncCommitEvent = (ApplicationEvent) invocationOnMock.getArgument(0);
            if (!(syncCommitEvent instanceof SyncCommitEvent)) {
                return null;
            }
            atomicReference.set(syncCommitEvent);
            syncCommitEvent.markOffsetsReady();
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any());
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(Duration.ZERO);
        Assertions.assertNotNull(atomicReference.get());
        Assertions.assertFalse(((SyncCommitEvent) atomicReference.get()).future().isCompletedExceptionally());
    }

    @Test
    public void testCloseAwaitPendingAsyncCommitIncomplete() {
        this.time = new MockTime(1L);
        this.consumer = newConsumer();
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(topicPartition, 20L);
        markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync();
        Assertions.assertInstanceOf(TimeoutException.class, ((Exception) Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.close(Duration.ofMillis(10L));
        })).getCause());
    }

    @Test
    public void testCloseAwaitPendingAsyncCommitComplete() {
        this.time = new MockTime(1L);
        this.consumer = newConsumer();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(topicPartition, 20L);
        completeCommitAsyncApplicationEventSuccessfully();
        this.consumer.commitAsync(mockCommitCallback);
        completeUnsubscribeApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.close(Duration.ofMillis(10L));
        });
        Assertions.assertEquals(1, mockCommitCallback.invoked);
    }

    @Test
    public void testInterceptorAutoCommitOnClose() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("test-id");
        requiredConsumerConfigAndGroupId.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        requiredConsumerConfigAndGroupId.setProperty("enable.auto.commit", "true");
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        completeCommitSyncApplicationEventSuccessfully();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(Duration.ZERO);
        Assertions.assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        Assertions.assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorCommitSync() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("test-id");
        requiredConsumerConfigAndGroupId.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        requiredConsumerConfigAndGroupId.setProperty("enable.auto.commit", "false");
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        completeCommitSyncApplicationEventSuccessfully();
        this.consumer.commitSync(mockTopicPartitionOffset());
        Assertions.assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testNoInterceptorCommitSyncFailed() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("test-id");
        requiredConsumerConfigAndGroupId.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        requiredConsumerConfigAndGroupId.setProperty("enable.auto.commit", "false");
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        KafkaException kafkaException = new KafkaException("Test exception");
        completeCommitSyncApplicationEventExceptionally(kafkaException);
        Assertions.assertEquals(kafkaException, Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.commitSync(mockTopicPartitionOffset());
        }));
        Assertions.assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testInterceptorCommitAsync() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("test-id");
        requiredConsumerConfigAndGroupId.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        requiredConsumerConfigAndGroupId.setProperty("enable.auto.commit", "false");
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        completeCommitAsyncApplicationEventSuccessfully();
        this.consumer.commitAsync(mockTopicPartitionOffset(), new MockCommitCallback());
        Assertions.assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        forceCommitCallbackInvocation();
        Assertions.assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testNoInterceptorCommitAsyncFailed() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("test-id");
        requiredConsumerConfigAndGroupId.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        requiredConsumerConfigAndGroupId.setProperty("enable.auto.commit", "false");
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        completeCommitAsyncApplicationEventExceptionally(new KafkaException("Test exception"));
        this.consumer.commitAsync(mockTopicPartitionOffset(), new MockCommitCallback());
        Assertions.assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        forceCommitCallbackInvocation();
        Assertions.assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() {
        this.consumer = newConsumer();
        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true);
    }

    @Test
    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
        this.consumer = newConsumerWithoutGroupId();
        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
    }

    @Test
    public void testSubscribeGeneratesEvent() {
        this.consumer = newConsumer();
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic1"));
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
    }

    @Test
    public void testSubscribePatternGeneratesEvent() {
        this.consumer = newConsumer();
        Pattern compile = Pattern.compile("topic.*");
        completeTopicPatternSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(compile);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
    }

    @Test
    public void testUnsubscribeGeneratesUnsubscribeEvent() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(UnsubscribeEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) forClass.capture());
        Assertions.assertTrue(((UnsubscribeEvent) forClass.getValue()).deadlineMs() <= this.time.milliseconds() + ((long) ((Integer) ConsumerConfig.configDef().defaultValues().get("default.api.timeout.ms")).intValue()));
    }

    @Test
    public void testSubscribeToEmptyListActsAsUnsubscribe() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.subscribe(Collections.emptyList());
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    @Test
    public void testSubscribeToNullTopicCollection() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe((List) null);
        });
    }

    @Test
    public void testSubscriptionOnNullTopic() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(null));
        });
    }

    @Test
    public void testSubscriptionOnEmptyTopic() {
        this.consumer = newConsumer();
        String str = "  ";
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(str));
        });
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
        ConsumerConfig consumerConfig = new ConsumerConfig(TestUtils.requiredConsumerConfig());
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertFalse(consumerConfig.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse(consumerConfig.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
        AsyncKafkaConsumer<String, String> asyncKafkaConsumer = this.consumer;
        Objects.requireNonNull(asyncKafkaConsumer);
        Assertions.assertEquals("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.", Assertions.assertThrows(InvalidGroupIdException.class, asyncKafkaConsumer::groupMetadata).getMessage());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroupA"));
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals("consumerGroupA", groupMetadata.groupId());
        Assertions.assertEquals(Optional.empty(), groupMetadata.groupInstanceId());
        Assertions.assertEquals(-1, groupMetadata.generationId());
        Assertions.assertEquals("", groupMetadata.memberId());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("consumerGroupA");
        requiredConsumerConfigAndGroupId.put("group.instance.id", "groupInstanceId1");
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId);
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals("consumerGroupA", groupMetadata.groupId());
        Assertions.assertEquals(Optional.of("groupInstanceId1"), groupMetadata.groupInstanceId());
        Assertions.assertEquals(-1, groupMetadata.generationId());
        Assertions.assertEquals("", groupMetadata.memberId());
    }

    private MemberStateListener captureGroupMetadataUpdateListener(MockedStatic<RequestManagers> mockedStatic) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(MemberStateListener.class);
        mockedStatic.verify(() -> {
            RequestManagers.supplier((Time) ArgumentMatchers.any(), (LogContext) ArgumentMatchers.any(), (BackgroundEventHandler) ArgumentMatchers.any(), (ConsumerMetadata) ArgumentMatchers.any(), (SubscriptionState) ArgumentMatchers.any(), (FetchBuffer) ArgumentMatchers.any(), (ConsumerConfig) ArgumentMatchers.any(), (GroupRebalanceConfig) ArgumentMatchers.any(), (ApiVersions) ArgumentMatchers.any(), (FetchMetricsManager) ArgumentMatchers.any(), (Supplier) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any(), (Metrics) ArgumentMatchers.any(), (OffsetCommitCallbackInvoker) ArgumentMatchers.any(), (MemberStateListener) forClass.capture());
        });
        return (MemberStateListener) forClass.getValue();
    }

    @Test
    public void testGroupMetadataUpdate() {
        MockedStatic<RequestManagers> mockStatic = Mockito.mockStatic(RequestManagers.class);
        try {
            this.consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroupA"));
            ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
            captureGroupMetadataUpdateListener(mockStatic).onMemberEpochUpdated(Optional.of(42), "memberId");
            ConsumerGroupMetadata groupMetadata2 = this.consumer.groupMetadata();
            Assertions.assertEquals(groupMetadata.groupId(), groupMetadata2.groupId());
            Assertions.assertEquals("memberId", groupMetadata2.memberId());
            Assertions.assertEquals(42, groupMetadata2.generationId());
            Assertions.assertEquals(groupMetadata.groupInstanceId(), groupMetadata2.groupInstanceId());
            if (mockStatic != null) {
                mockStatic.close();
            }
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testGroupMetadataIsResetAfterUnsubscribe() {
        MockedStatic<RequestManagers> mockStatic = Mockito.mockStatic(RequestManagers.class);
        try {
            this.consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroupA"));
            MemberStateListener captureGroupMetadataUpdateListener = captureGroupMetadataUpdateListener(mockStatic);
            this.consumer.subscribe(Collections.singletonList("topic"));
            captureGroupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(42), "memberId");
            ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
            Assertions.assertNotEquals(-1, groupMetadata.generationId());
            Assertions.assertNotEquals("", groupMetadata.memberId());
            if (mockStatic != null) {
                mockStatic.close();
            }
            completeUnsubscribeApplicationEventSuccessfully();
            this.consumer.unsubscribe();
            Assertions.assertEquals(new ConsumerGroupMetadata("consumerGroupA", -1, "", Optional.empty()), this.consumer.groupMetadata());
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @MethodSource({"listenerCallbacksInvokeSource"})
    @ParameterizedTest
    public void testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName> list, Optional<RuntimeException> optional, Optional<RuntimeException> optional2, Optional<RuntimeException> optional3, int i, int i2, int i3, Optional<RuntimeException> optional4) {
        this.consumer = newConsumer();
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener(optional, optional2, optional3);
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic"), counterConsumerRebalanceListener);
        SortedSet emptySortedSet = Collections.emptySortedSet();
        Iterator<ConsumerRebalanceListenerMethodName> it = list.iterator();
        while (it.hasNext()) {
            this.backgroundEventQueue.add(new ConsumerRebalanceListenerCallbackNeededEvent(it.next(), emptySortedSet));
        }
        markReconcileAndAutoCommitCompleteForPollEvent();
        if (optional4.isPresent()) {
            Exception exc = (Exception) Assertions.assertThrows(optional4.get().getClass(), () -> {
                this.consumer.poll(Duration.ZERO);
            });
            Assertions.assertEquals(optional4.get().getMessage(), exc.getMessage());
            Assertions.assertEquals(optional4.get().getCause(), exc.getCause());
        } else {
            Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
            Assertions.assertDoesNotThrow(() -> {
                return this.consumer.poll(Duration.ZERO);
            });
        }
        Assertions.assertEquals(i, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(i2, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(i3, counterConsumerRebalanceListener.lostCount());
    }

    private static Stream<Arguments> listenerCallbacksInvokeSource() {
        Optional empty = Optional.empty();
        Optional of = Optional.of(new RuntimeException("Intentional error"));
        Optional of2 = Optional.of(new KafkaException("Intentional error"));
        Optional of3 = Optional.of(new KafkaException("User rebalance callback throws an error", (Throwable) of.get()));
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), of, empty, empty, 1, 0, 0, of3}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, of, empty, 0, 1, 0, of3}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, of, 0, 0, 1, of3}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), of2, empty, empty, 1, 0, 0, of2}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, of2, empty, 0, 1, 0, of2}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, of2, 0, 0, 1, of2}), Arguments.of(new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), of, empty, empty, 1, 1, 0, of3}), Arguments.of(new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), of2, of, empty, 1, 1, 0, of2})});
    }

    @Test
    public void testBackgroundError() {
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroupA"));
        KafkaException kafkaException = new KafkaException("Nobody expects the Spanish Inquisition");
        this.backgroundEventQueue.add(new ErrorEvent(kafkaException));
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertEquals(kafkaException.getMessage(), Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        }).getMessage());
    }

    @Test
    public void testMultipleBackgroundErrors() {
        this.consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroupA"));
        KafkaException kafkaException = new KafkaException("Nobody expects the Spanish Inquisition");
        this.backgroundEventQueue.add(new ErrorEvent(kafkaException));
        this.backgroundEventQueue.add(new ErrorEvent(new KafkaException("Spam, Spam, Spam")));
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertEquals(kafkaException.getMessage(), Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        }).getMessage());
        Assertions.assertTrue(this.backgroundEventQueue.isEmpty());
    }

    @Test
    public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("group.remote.assignor", "someAssignor");
        requiredConsumerConfig.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerConfig);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertTrue(consumerConfig.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupRemoteAssignorInClassicProtocol() {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("group.id", "consumerGroupA");
        requiredConsumerConfig.put("group.protocol", GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
        requiredConsumerConfig.put("group.remote.assignor", "someAssignor");
        Assertions.assertThrows(ConfigException.class, () -> {
            new ConsumerConfig(requiredConsumerConfig);
        });
    }

    @Test
    public void testGroupRemoteAssignorUsedInConsumerProtocol() {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("group.id", "consumerGroupA");
        requiredConsumerConfig.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        requiredConsumerConfig.put("group.remote.assignor", "someAssignor");
        requiredConsumerConfig.put("enable.auto.commit", false);
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerConfig);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertFalse(consumerConfig.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupIdNull() {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("auto.commit.interval.ms", 10000);
        requiredConsumerConfig.put("internal.throw.on.fetch.stable.offset.unsupported", true);
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerConfig);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertFalse(consumerConfig.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse(consumerConfig.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testGroupIdNotNullAndValid() {
        Properties requiredConsumerConfigAndGroupId = requiredConsumerConfigAndGroupId("consumerGroupA");
        requiredConsumerConfigAndGroupId.put("auto.commit.interval.ms", 10000);
        requiredConsumerConfigAndGroupId.put("internal.throw.on.fetch.stable.offset.unsupported", true);
        requiredConsumerConfigAndGroupId.put("enable.auto.commit", false);
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerConfigAndGroupId);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertTrue(consumerConfig.unused().contains("auto.commit.interval.ms"));
        Assertions.assertTrue(consumerConfig.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testEnsurePollEventSentOnConsumerPoll() {
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE), "group-id", "client-id", false);
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 0, 2L, "key1", "value1"));
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            return Fetch.forPartition(topicPartition, singletonList, true, new OffsetAndMetadata(3L, Optional.of(0), ""));
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) Mockito.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic1"));
        markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ofMillis(100L));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(PollEvent.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(CreateFetchRequestsEvent.class));
    }

    private Properties requiredConsumerConfigAndGroupId(String str) {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("group.id", str);
        return requiredConsumerConfig;
    }

    private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean z) {
        completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException());
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(new TopicPartition("t1", 1)));
        markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
    }

    @Test
    public void testLongPollWaitIsLimited() {
        this.consumer = newConsumer();
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic1"));
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        TopicPartition topicPartition = new TopicPartition("topic1", 3);
        List asList = Arrays.asList(new ConsumerRecord("topic1", 3, 2L, "key1", "value1"), new ConsumerRecord("topic1", 3, 3L, "key2", "value2"));
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(4L, Optional.of(0), "");
        Set singleton = Collections.singleton(topicPartition);
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.subscriptions().assignFromSubscribed(singleton);
            this.consumer.setGroupAssignmentSnapshot(singleton);
            return Fetch.empty();
        }).doAnswer(invocationOnMock2 -> {
            return Fetch.forPartition(topicPartition, asList, true, offsetAndMetadata);
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        markReconcileAndAutoCommitCompleteForPollEvent();
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(10000L));
        Assertions.assertEquals(2, poll.count());
        Assertions.assertEquals(4L, ((OffsetAndMetadata) poll.nextOffsets().get(topicPartition)).offset());
        Assertions.assertEquals(Optional.of(0), ((OffsetAndMetadata) poll.nextOffsets().get(topicPartition)).leaderEpoch());
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        Assertions.assertEquals(singleton, this.consumer.assignment());
    }

    @Test
    public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
        this.consumer = newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        ((CompletableFuture) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            if (countDownLatch.getCount() > 0) {
                timer.sleep(((Long) invocationOnMock.getArgument(0, Long.class)).longValue());
                throw new java.util.concurrent.TimeoutException("Intentional timeout");
            }
            completableFuture.complete(null);
            return null;
        }).when(completableFuture)).get(((Long) ArgumentMatchers.any(Long.class)).longValue(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        this.consumer.processBackgroundEvents(completableFuture, timer, exc -> {
            return false;
        });
        Assertions.assertEquals(800L, timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsWithoutDelay() {
        this.consumer = newConsumer();
        Timer timer = this.time.timer(1000L);
        this.consumer.processBackgroundEvents(CompletableFuture.completedFuture(null), timer, exc -> {
            return false;
        });
        Assertions.assertEquals(1000L, timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsTimesOut() throws Exception {
        this.consumer = newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doAnswer(invocationOnMock -> {
            timer.sleep(((Long) invocationOnMock.getArgument(0, Long.class)).longValue());
            throw new java.util.concurrent.TimeoutException("Intentional timeout");
        }).when(completableFuture)).get(((Long) ArgumentMatchers.any(Long.class)).longValue(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.processBackgroundEvents(completableFuture, timer, exc -> {
                return false;
            });
        });
        Assertions.assertEquals(0L, timer.remainingMs());
    }

    @Test
    public void testPollThrowsInterruptExceptionIfInterrupted() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(topicPartition));
        try {
            Thread.currentThread().interrupt();
            markReconcileAndAutoCommitCompleteForPollEvent();
            Assertions.assertThrows(InterruptException.class, () -> {
                this.consumer.poll(Duration.ZERO);
            });
            Thread.interrupted();
            Assertions.assertDoesNotThrow(() -> {
                return this.consumer.poll(Duration.ZERO);
            });
        } catch (Throwable th) {
            Thread.interrupted();
            throw th;
        }
    }

    @Test
    void testReaperInvokedInClose() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        ((CompletableEventReaper) Mockito.verify(this.backgroundEventReaper)).reap(this.backgroundEventQueue);
    }

    @Test
    void testReaperInvokedInUnsubscribe() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ((CompletableEventReaper) Mockito.verify(this.backgroundEventReaper)).reap(this.time.milliseconds());
    }

    @Test
    void testReaperInvokedInPoll() {
        this.consumer = newConsumer();
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic"));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        ((CompletableEventReaper) Mockito.verify(this.backgroundEventReaper)).reap(this.time.milliseconds());
    }

    @Test
    public void testUnsubscribeWithoutGroupId() {
        this.consumer = newConsumerWithoutGroupId();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    @Test
    public void testSeekToBeginning() {
        Set singleton = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = newConsumer();
        this.consumer.seekToBeginning(singleton);
        ResetOffsetEvent resetOffsetEvent = (ResetOffsetEvent) Assertions.assertInstanceOf(ResetOffsetEvent.class, addAndGetLastEnqueuedEvent());
        Assertions.assertEquals(singleton, new HashSet(resetOffsetEvent.topicPartitions()));
        Assertions.assertEquals(AutoOffsetResetStrategy.EARLIEST, resetOffsetEvent.offsetResetStrategy());
    }

    @Test
    public void testSeekToBeginningWithException() {
        Set singleton = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = newConsumer();
        completeResetOffsetEventExceptionally(new TimeoutException());
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.seekToBeginning(singleton);
        });
    }

    @Test
    public void testSeekToEndWithException() {
        Set singleton = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = newConsumer();
        completeResetOffsetEventExceptionally(new TimeoutException());
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.seekToEnd(singleton);
        });
    }

    @Test
    public void testSeekToEnd() {
        Set singleton = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = newConsumer();
        this.consumer.seekToEnd(singleton);
        ResetOffsetEvent resetOffsetEvent = (ResetOffsetEvent) Assertions.assertInstanceOf(ResetOffsetEvent.class, addAndGetLastEnqueuedEvent());
        Assertions.assertEquals(singleton, new HashSet(resetOffsetEvent.topicPartitions()));
        Assertions.assertEquals(AutoOffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy());
    }

    @Test
    public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() {
        this.consumer = newConsumer();
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Boolean) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        completeAssignmentChangeEventSuccessfully();
        completeTopicPatternSubscriptionChangeEventSuccessfully();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(UpdatePatternSubscriptionEvent.class));
        this.consumer.unsubscribe();
        this.consumer.subscribe(Pattern.compile("t*"));
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(UpdatePatternSubscriptionEvent.class));
    }

    @Test
    public void testSubscribeToRe2JPatternValidation() {
        this.consumer = newConsumer();
        Assertions.assertEquals("Topic pattern to subscribe to cannot be null", Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe((SubscriptionPattern) null);
        }).getMessage());
        Assertions.assertEquals("Topic pattern to subscribe to cannot be empty", Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(new SubscriptionPattern(""));
        }).getMessage());
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.subscribe(new SubscriptionPattern("t*"));
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener) null);
        });
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        });
    }

    @Test
    public void testSubscribeToRe2JPatternThrowsIfNoGroupId() {
        this.consumer = newConsumer(TestUtils.requiredConsumerConfig());
        Assertions.assertThrows(InvalidGroupIdException.class, () -> {
            this.consumer.subscribe(new SubscriptionPattern("t*"));
        });
        Assertions.assertThrows(InvalidGroupIdException.class, () -> {
            this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        });
    }

    @Test
    public void testSubscribeToRe2JPatternGeneratesEvent() {
        this.consumer = newConsumer();
        completeTopicRe2JPatternSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(new SubscriptionPattern("t*"));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
        Mockito.clearInvocations(new ApplicationEventHandler[]{this.applicationEventHandler});
        this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
    }

    @Test
    public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException {
        Properties requiredConsumerConfig = TestUtils.requiredConsumerConfig();
        requiredConsumerConfig.put("group.id", "group-id");
        requiredConsumerConfig.put("enable.auto.commit", "false");
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerConfig);
        ConsumerMetadata consumerMetadata = new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, (SubscriptionState) Mockito.mock(SubscriptionState.class), new LogContext(), new ClusterResourceListeners());
        MockClient mockClient = new MockClient(this.time, (Metadata) consumerMetadata);
        mockClient.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, (Map<String, Integer>) Map.of("topic1", 2), (Map<String, Uuid>) Map.of("topic1", Uuid.randomUuid())));
        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.CONSUMER_GROUP_HEARTBEAT.id, (short) 0, (short) 0));
        Node node = (Node) consumerMetadata.fetch().nodes().get(0);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node), node);
        mockClient.prepareResponseFrom(new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setMemberId("").setMemberEpoch(0)), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        this.consumer = new AsyncKafkaConsumer<>(new LogContext(), this.time, consumerConfig, new StringDeserializer(), new StringDeserializer(), mockClient, subscriptionState, consumerMetadata);
        completeTopicRe2JPatternSubscriptionChangeEventSuccessfully();
        SubscriptionPattern subscriptionPattern = new SubscriptionPattern("t*");
        this.consumer.subscribe(subscriptionPattern);
        Mockito.when(subscriptionState.subscriptionPattern()).thenReturn(subscriptionPattern);
        TestUtils.waitForCondition(() -> {
            try {
                this.consumer.poll(Duration.ZERO);
                return false;
            } catch (UnsupportedVersionException e) {
                return true;
            }
        }, "Consumer did not throw the expected UnsupportedVersionException on poll");
    }

    @Test
    public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), (SubscriptionState) Mockito.mock(SubscriptionState.class), "group-id", "client-id", false);
        Metrics metricsRegistry = this.consumer.metricsRegistry();
        AsyncConsumerMetrics kafkaConsumerMetrics = this.consumer.kafkaConsumerMetrics();
        BackgroundEvent consumerRebalanceListenerCallbackNeededEvent = new ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, Collections.emptySortedSet());
        consumerRebalanceListenerCallbackNeededEvent.setEnqueuedMs(this.time.milliseconds());
        this.backgroundEventQueue.add(consumerRebalanceListenerCallbackNeededEvent);
        kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
        this.time.sleep(10L);
        this.consumer.processBackgroundEvents();
        Assertions.assertEquals(0.0d, ((Double) metricsRegistry.metric(metricsRegistry.metricName("background-event-queue-size", "consumer-metrics")).metricValue()).doubleValue());
        Assertions.assertEquals(10.0d, ((Double) metricsRegistry.metric(metricsRegistry.metricName("background-event-queue-time-avg", "consumer-metrics")).metricValue()).doubleValue());
        Assertions.assertEquals(10.0d, ((Double) metricsRegistry.metric(metricsRegistry.metricName("background-event-queue-time-max", "consumer-metrics")).metricValue()).doubleValue());
    }

    private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L));
        return hashMap;
    }

    private Map<TopicPartition, OffsetAndTimestampInternal> mockOffsetAndTimestamp() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndTimestampInternal(5L, 1L, Optional.empty()));
        hashMap.put(topicPartition2, new OffsetAndTimestampInternal(6L, 3L, Optional.empty()));
        return hashMap;
    }

    private Map<TopicPartition, Long> mockTimestampToSearch() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 1L);
        hashMap.put(topicPartition2, 2L);
        return hashMap;
    }

    private void completeCommitAsyncApplicationEventExceptionally(Exception exc) {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            AsyncCommitEvent asyncCommitEvent = (AsyncCommitEvent) invocationOnMock.getArgument(0);
            asyncCommitEvent.markOffsetsReady();
            asyncCommitEvent.future().completeExceptionally(exc);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(AsyncCommitEvent.class));
    }

    private void completeCommitSyncApplicationEventExceptionally(Exception exc) {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            SyncCommitEvent syncCommitEvent = (SyncCommitEvent) invocationOnMock.getArgument(0);
            syncCommitEvent.markOffsetsReady();
            syncCommitEvent.future().completeExceptionally(exc);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    private void completeResetOffsetEventExceptionally(Exception exc) {
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{exc}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ResetOffsetEvent.class));
    }

    private void completeCommitAsyncApplicationEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            AsyncCommitEvent asyncCommitEvent = (AsyncCommitEvent) invocationOnMock.getArgument(0);
            asyncCommitEvent.markOffsetsReady();
            asyncCommitEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(AsyncCommitEvent.class));
    }

    private void completeCommitSyncApplicationEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            SyncCommitEvent syncCommitEvent = (SyncCommitEvent) invocationOnMock.getArgument(0);
            syncCommitEvent.markOffsetsReady();
            syncCommitEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventSuccessfully(Map<TopicPartition, OffsetAndMetadata> map) {
        ((ApplicationEventHandler) Mockito.doReturn(map).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((FetchCommittedOffsetsEvent) invocationOnMock.getArgument(0)).future().complete(map);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception exc) {
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{exc}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
    }

    private void completeUnsubscribeApplicationEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            UnsubscribeEvent unsubscribeEvent = (UnsubscribeEvent) invocationOnMock.getArgument(0);
            this.consumer.subscriptions().unsubscribe();
            unsubscribeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    private void completeAssignmentChangeEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            AssignmentChangeEvent assignmentChangeEvent = (AssignmentChangeEvent) invocationOnMock.getArgument(0);
            this.consumer.subscriptions().assignFromUser(new HashSet(assignmentChangeEvent.partitions()));
            assignmentChangeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(AssignmentChangeEvent.class));
    }

    private void completeTopicSubscriptionChangeEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            TopicSubscriptionChangeEvent topicSubscriptionChangeEvent = (TopicSubscriptionChangeEvent) invocationOnMock.getArgument(0);
            this.consumer.subscriptions().subscribe(topicSubscriptionChangeEvent.topics(), topicSubscriptionChangeEvent.listener());
            topicSubscriptionChangeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
    }

    private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            TopicPatternSubscriptionChangeEvent topicPatternSubscriptionChangeEvent = (TopicPatternSubscriptionChangeEvent) invocationOnMock.getArgument(0);
            this.consumer.subscriptions().subscribe(topicPatternSubscriptionChangeEvent.pattern(), topicPatternSubscriptionChangeEvent.listener());
            topicPatternSubscriptionChangeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
    }

    private void completeTopicRe2JPatternSubscriptionChangeEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            TopicRe2JPatternSubscriptionChangeEvent topicRe2JPatternSubscriptionChangeEvent = (TopicRe2JPatternSubscriptionChangeEvent) invocationOnMock.getArgument(0);
            this.consumer.subscriptions().subscribe(topicRe2JPatternSubscriptionChangeEvent.pattern(), topicRe2JPatternSubscriptionChangeEvent.listener());
            topicRe2JPatternSubscriptionChangeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
    }

    private void completeSeekUnvalidatedEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            SeekUnvalidatedEvent seekUnvalidatedEvent = (SeekUnvalidatedEvent) invocationOnMock.getArgument(0);
            this.consumer.subscriptions().seekUnvalidated(seekUnvalidatedEvent.partition(), new SubscriptionState.FetchPosition(seekUnvalidatedEvent.offset(), seekUnvalidatedEvent.offsetEpoch(), this.metadata.currentLeader(seekUnvalidatedEvent.partition())));
            seekUnvalidatedEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
    }

    private void forceCommitCallbackInvocation() {
        this.consumer.commitAsync();
    }

    private void markOffsetsReadyForCommitEvent() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((CommitEvent) invocationOnMock.getArgument(0)).markOffsetsReady();
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(CommitEvent.class));
    }

    private void markReconcileAndAutoCommitCompleteForPollEvent() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((PollEvent) invocationOnMock.getArgument(0)).markReconcileAndAutoCommitComplete();
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(PollEvent.class));
    }
}
