package org.apache.kafka.clients.consumer.internals.events;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.class */
public class ApplicationEventProcessorTest {
    private final Time time = new MockTime();
    private final CommitRequestManager commitRequestManager = (CommitRequestManager) Mockito.mock(CommitRequestManager.class);
    private final ConsumerHeartbeatRequestManager heartbeatRequestManager = (ConsumerHeartbeatRequestManager) Mockito.mock(ConsumerHeartbeatRequestManager.class);
    private final ConsumerMembershipManager membershipManager = (ConsumerMembershipManager) Mockito.mock(ConsumerMembershipManager.class);
    private final OffsetsRequestManager offsetsRequestManager = (OffsetsRequestManager) Mockito.mock(OffsetsRequestManager.class);
    private SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata) Mockito.mock(ConsumerMetadata.class);
    private ApplicationEventProcessor processor;

    private void setupProcessor(boolean z) {
        this.processor = new ApplicationEventProcessor(new LogContext(), new RequestManagers(new LogContext(), this.offsetsRequestManager, (TopicMetadataRequestManager) Mockito.mock(TopicMetadataRequestManager.class), (FetchRequestManager) Mockito.mock(FetchRequestManager.class), z ? Optional.of((CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class)) : Optional.empty(), z ? Optional.of(this.commitRequestManager) : Optional.empty(), z ? Optional.of(this.heartbeatRequestManager) : Optional.empty(), z ? Optional.of(this.membershipManager) : Optional.empty()), this.metadata, this.subscriptionState);
    }

    @Test
    public void testPrepClosingCommitEvents() {
        setupProcessor(true);
        ((CommitRequestManager) Mockito.doReturn(new NetworkClientDelegate.PollResult(100L, mockCommitResults())).when(this.commitRequestManager)).pollOnClose(ArgumentMatchers.anyLong());
        this.processor.process(new CommitOnCloseEvent());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).signalClose();
    }

    @Test
    public void testProcessUnsubscribeEventWithGroupId() {
        setupProcessor(true);
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        Mockito.when(this.membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
        this.processor.process(new UnsubscribeEvent(0L));
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).leaveGroup();
    }

    @Test
    public void testProcessUnsubscribeEventWithoutGroupId() {
        setupProcessor(false);
        this.processor.process(new UnsubscribeEvent(0L));
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).unsubscribe();
    }

    @MethodSource({"applicationEvents"})
    @ParameterizedTest
    public void testApplicationEventIsProcessed(ApplicationEvent applicationEvent) {
        ApplicationEventProcessor applicationEventProcessor = (ApplicationEventProcessor) Mockito.mock(ApplicationEventProcessor.class);
        applicationEventProcessor.process(applicationEvent);
        ((ApplicationEventProcessor) Mockito.verify(applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(applicationEvent.getClass()));
    }

    private static Stream<Arguments> applicationEvents() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{new PollEvent(100L)}), Arguments.of(new Object[]{new CreateFetchRequestsEvent(CompletableEvent.calculateDeadlineMs(12345L, 100L))}), Arguments.of(new Object[]{new CheckAndUpdatePositionsEvent(500L)}), Arguments.of(new Object[]{new TopicMetadataEvent("topic", Long.MAX_VALUE)}), Arguments.of(new Object[]{new AssignmentChangeEvent(12345L, 12345L, Collections.emptyList())})});
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testListOffsetsEventIsProcessed(boolean z) {
        ApplicationEventProcessor applicationEventProcessor = (ApplicationEventProcessor) Mockito.mock(ApplicationEventProcessor.class);
        applicationEventProcessor.process(new ListOffsetsEvent(Collections.singletonMap(new TopicPartition("topic1", 1), 5L), CompletableEvent.calculateDeadlineMs(this.time, 100L), z));
        ((ApplicationEventProcessor) Mockito.verify(applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ListOffsetsEvent.class));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAssignmentChangeEvent(boolean z) {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        AssignmentChangeEvent assignmentChangeEvent = new AssignmentChangeEvent(12345L, 12345L, Collections.singleton(topicPartition));
        setupProcessor(z);
        ((SubscriptionState) Mockito.doReturn(true).when(this.subscriptionState)).assignFromUser(Collections.singleton(topicPartition));
        this.processor.process(assignmentChangeEvent);
        if (z) {
            ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).updateTimerAndMaybeCommit(12345L);
        } else {
            ((CommitRequestManager) Mockito.verify(this.commitRequestManager, Mockito.never())).updateTimerAndMaybeCommit(12345L);
        }
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdateForNewTopics();
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromUser(Collections.singleton(topicPartition));
        Assertions.assertDoesNotThrow(() -> {
            return (Void) assignmentChangeEvent.future().get();
        });
    }

    @Test
    public void testAssignmentChangeEventWithException() {
        AssignmentChangeEvent assignmentChangeEvent = new AssignmentChangeEvent(12345L, 12345L, Collections.emptyList());
        setupProcessor(false);
        ((SubscriptionState) Mockito.doThrow(new Throwable[]{new IllegalStateException()}).when(this.subscriptionState)).assignFromUser((Set) ArgumentMatchers.any());
        this.processor.process(assignmentChangeEvent);
        Assertions.assertInstanceOf(IllegalStateException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            assignmentChangeEvent.future().get();
        })).getCause());
    }

    @Test
    public void testResetOffsetEvent() {
        ResetOffsetEvent resetOffsetEvent = new ResetOffsetEvent(Collections.singleton(new TopicPartition("topic", 0)), AutoOffsetResetStrategy.LATEST, 12345L);
        setupProcessor(false);
        this.processor.process(resetOffsetEvent);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).requestOffsetReset(resetOffsetEvent.topicPartitions(), resetOffsetEvent.offsetResetStrategy());
    }

    @Test
    public void testSeekUnvalidatedEvent() {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Optional of = Optional.of(1);
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(0L, of, Metadata.LeaderAndEpoch.noLeaderOrEpoch());
        SeekUnvalidatedEvent seekUnvalidatedEvent = new SeekUnvalidatedEvent(12345L, topicPartition, 0L, of);
        setupProcessor(false);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader(topicPartition);
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).seekUnvalidated((TopicPartition) ArgumentMatchers.eq(topicPartition), (SubscriptionState.FetchPosition) ArgumentMatchers.any());
        this.processor.process(seekUnvalidatedEvent);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition, ((Integer) of.get()).intValue());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).currentLeader(topicPartition);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).seekUnvalidated(topicPartition, fetchPosition);
        Assertions.assertDoesNotThrow(() -> {
            return (Void) seekUnvalidatedEvent.future().get();
        });
    }

    @Test
    public void testSeekUnvalidatedEventWithException() {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        SeekUnvalidatedEvent seekUnvalidatedEvent = new SeekUnvalidatedEvent(12345L, topicPartition, 0L, Optional.empty());
        setupProcessor(false);
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader(topicPartition);
        ((SubscriptionState) Mockito.doThrow(new Throwable[]{new IllegalStateException()}).when(this.subscriptionState)).seekUnvalidated((TopicPartition) ArgumentMatchers.eq(topicPartition), (SubscriptionState.FetchPosition) ArgumentMatchers.any());
        this.processor.process(seekUnvalidatedEvent);
        Assertions.assertInstanceOf(IllegalStateException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            seekUnvalidatedEvent.future().get();
        })).getCause());
    }

    @Test
    public void testPollEvent() {
        PollEvent pollEvent = new PollEvent(12345L);
        setupProcessor(true);
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        this.processor.process(pollEvent);
        Assertions.assertTrue(pollEvent.reconcileAndAutoCommit().isDone());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).updateTimerAndMaybeCommit(12345L);
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onConsumerPoll();
        ((ConsumerHeartbeatRequestManager) Mockito.verify(this.heartbeatRequestManager)).resetPollTimer(12345L);
    }

    @Test
    public void testTopicSubscriptionChangeEvent() {
        Set of = Set.of("topic1", "topic2");
        Optional of2 = Optional.of(new MockRebalanceListener());
        TopicSubscriptionChangeEvent topicSubscriptionChangeEvent = new TopicSubscriptionChangeEvent(of, of2, 12345L);
        setupProcessor(true);
        Mockito.when(Boolean.valueOf(this.subscriptionState.subscribe(of, of2))).thenReturn(true);
        Mockito.when(Integer.valueOf(this.metadata.requestUpdateForNewTopics())).thenReturn(1);
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        this.processor.process(topicSubscriptionChangeEvent);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).subscribe(of, of2);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdateForNewTopics();
        Assertions.assertEquals(1, this.processor.metadataVersionSnapshot());
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onSubscriptionUpdated();
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager, Mockito.never())).onConsumerPoll();
        Assertions.assertDoesNotThrow(() -> {
            return (Void) topicSubscriptionChangeEvent.future().get();
        });
    }

    @Test
    public void testFetchCommittedOffsetsEvent() {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic", 1);
        TopicPartition topicPartition3 = new TopicPartition("topic", 2);
        Set of = Set.of(topicPartition, topicPartition2, topicPartition3);
        Map of2 = Map.of(topicPartition, new OffsetAndMetadata(10L, Optional.of(2), ""), topicPartition2, new OffsetAndMetadata(15L, Optional.empty(), ""), topicPartition3, new OffsetAndMetadata(20L, Optional.of(3), ""));
        FetchCommittedOffsetsEvent fetchCommittedOffsetsEvent = new FetchCommittedOffsetsEvent(of, 12345L);
        setupProcessor(true);
        Mockito.when(this.commitRequestManager.fetchOffsets(of, 12345L)).thenReturn(CompletableFuture.completedFuture(of2));
        this.processor.process(fetchCommittedOffsetsEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).fetchOffsets(of, 12345L);
        Assertions.assertEquals(of2, Assertions.assertDoesNotThrow(() -> {
            return (Map) fetchCommittedOffsetsEvent.future().get();
        }));
    }

    @Test
    public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState() {
        this.subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);
        Optional of = Optional.of(new MockRebalanceListener());
        TopicSubscriptionChangeEvent topicSubscriptionChangeEvent = new TopicSubscriptionChangeEvent(Set.of("topic1", "topic2"), of, 12345L);
        this.subscriptionState.subscribe(Pattern.compile("topic.*"), of);
        setupProcessor(true);
        Mockito.when(Integer.valueOf(this.metadata.requestUpdateForNewTopics())).thenReturn(1);
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        this.processor.process(topicSubscriptionChangeEvent);
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            topicSubscriptionChangeEvent.future().get();
        });
        Assertions.assertInstanceOf(IllegalStateException.class, executionException.getCause());
        Assertions.assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", executionException.getCause().getMessage());
    }

    @Test
    public void testTopicPatternSubscriptionChangeEvent() {
        Pattern compile = Pattern.compile("topic.*");
        Set of = Set.of("topic.1", "topic.2");
        Optional of2 = Optional.of(new MockRebalanceListener());
        TopicPatternSubscriptionChangeEvent topicPatternSubscriptionChangeEvent = new TopicPatternSubscriptionChangeEvent(compile, of2, 12345L);
        setupProcessor(true);
        Cluster cluster = (Cluster) Mockito.mock(Cluster.class);
        Mockito.when(this.metadata.fetch()).thenReturn(cluster);
        Mockito.when(cluster.topics()).thenReturn(of);
        Mockito.when(Boolean.valueOf(this.subscriptionState.matchesSubscribedPattern("topic.1"))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.subscriptionState.matchesSubscribedPattern("topic.2"))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.subscriptionState.subscribeFromPattern(of))).thenReturn(true);
        Mockito.when(Integer.valueOf(this.metadata.requestUpdateForNewTopics())).thenReturn(1);
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        this.processor.process(topicPatternSubscriptionChangeEvent);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).subscribe(compile, of2);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).subscribeFromPattern(of);
        ((ConsumerMetadata) Mockito.verify(this.metadata, Mockito.times(2))).requestUpdateForNewTopics();
        Assertions.assertEquals(1, this.processor.metadataVersionSnapshot());
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onSubscriptionUpdated();
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager, Mockito.never())).onConsumerPoll();
        Assertions.assertDoesNotThrow(() -> {
            return (Void) topicPatternSubscriptionChangeEvent.future().get();
        });
    }

    @Test
    public void testTopicPatternSubscriptionTriggersJoin() {
        TopicPatternSubscriptionChangeEvent topicPatternSubscriptionChangeEvent = new TopicPatternSubscriptionChangeEvent(Pattern.compile("topic.*"), Optional.of(new MockRebalanceListener()), 12345L);
        setupProcessor(true);
        Mockito.when(this.metadata.fetch()).thenReturn((Cluster) Mockito.mock(Cluster.class));
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        Mockito.when(Boolean.valueOf(this.subscriptionState.subscribeFromPattern((Set) ArgumentMatchers.any()))).thenReturn(false);
        this.processor.process(topicPatternSubscriptionChangeEvent);
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onSubscriptionUpdated();
        Mockito.clearInvocations(new ConsumerMembershipManager[]{this.membershipManager});
        Mockito.when(Boolean.valueOf(this.subscriptionState.subscribeFromPattern((Set) ArgumentMatchers.any()))).thenReturn(true);
        this.processor.process(topicPatternSubscriptionChangeEvent);
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onSubscriptionUpdated();
    }

    @Test
    public void testTopicPatternSubscriptionChangeEventWithIllegalSubscriptionState() {
        this.subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);
        Optional of = Optional.of(new MockRebalanceListener());
        TopicPatternSubscriptionChangeEvent topicPatternSubscriptionChangeEvent = new TopicPatternSubscriptionChangeEvent(Pattern.compile("topic.*"), of, 12345L);
        setupProcessor(true);
        this.subscriptionState.subscribe(Set.of("topic.1", "topic.2"), of);
        this.processor.process(topicPatternSubscriptionChangeEvent);
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            topicPatternSubscriptionChangeEvent.future().get();
        });
        Assertions.assertInstanceOf(IllegalStateException.class, executionException.getCause());
        Assertions.assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", executionException.getCause().getMessage());
    }

    @Test
    public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewVersion() {
        UpdatePatternSubscriptionEvent updatePatternSubscriptionEvent = new UpdatePatternSubscriptionEvent(12345L);
        setupProcessor(true);
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasPatternSubscription())).thenReturn(true);
        Mockito.when(Integer.valueOf(this.metadata.updateVersion())).thenReturn(0);
        this.processor.process(updatePatternSubscriptionEvent);
        Assertions.assertDoesNotThrow(() -> {
            return (Void) updatePatternSubscriptionEvent.future().get();
        });
        Cluster cluster = (Cluster) Mockito.mock(Cluster.class);
        Set of = Set.of("topic.1", "topic.2");
        Mockito.when(Integer.valueOf(this.metadata.updateVersion())).thenReturn(1);
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasPatternSubscription())).thenReturn(true);
        Mockito.when(this.metadata.fetch()).thenReturn(cluster);
        Mockito.when(this.heartbeatRequestManager.membershipManager()).thenReturn(this.membershipManager);
        Mockito.when(cluster.topics()).thenReturn(of);
        Mockito.when(Boolean.valueOf(this.subscriptionState.matchesSubscribedPattern("topic.1"))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.subscriptionState.matchesSubscribedPattern("topic.2"))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.subscriptionState.subscribeFromPattern(of))).thenReturn(true);
        Mockito.when(Integer.valueOf(this.metadata.requestUpdateForNewTopics())).thenReturn(1);
        UpdatePatternSubscriptionEvent updatePatternSubscriptionEvent2 = new UpdatePatternSubscriptionEvent(12345L);
        this.processor.process(updatePatternSubscriptionEvent2);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdateForNewTopics();
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).subscribeFromPattern(of);
        Assertions.assertEquals(1, this.processor.metadataVersionSnapshot());
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onSubscriptionUpdated();
        Assertions.assertDoesNotThrow(() -> {
            return (Void) updatePatternSubscriptionEvent2.future().get();
        });
    }

    @Test
    public void testR2JPatternSubscriptionEventSuccess() {
        SubscriptionPattern subscriptionPattern = new SubscriptionPattern("t*");
        Optional of = Optional.of((ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        TopicRe2JPatternSubscriptionChangeEvent topicRe2JPatternSubscriptionChangeEvent = new TopicRe2JPatternSubscriptionChangeEvent(subscriptionPattern, of, 12345L);
        setupProcessor(true);
        this.processor.process(topicRe2JPatternSubscriptionChangeEvent);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).subscribe(subscriptionPattern, of);
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).subscribeFromPattern((Set) ArgumentMatchers.any());
        ((ConsumerMembershipManager) Mockito.verify(this.membershipManager)).onSubscriptionUpdated();
        Assertions.assertDoesNotThrow(() -> {
            return (Void) topicRe2JPatternSubscriptionChangeEvent.future().get();
        });
    }

    @Test
    public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
        SubscriptionPattern subscriptionPattern = new SubscriptionPattern("t*");
        Optional of = Optional.of((ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        TopicRe2JPatternSubscriptionChangeEvent topicRe2JPatternSubscriptionChangeEvent = new TopicRe2JPatternSubscriptionChangeEvent(subscriptionPattern, of, 12345L);
        IllegalStateException illegalStateException = new IllegalStateException("Subscription to topics, partitions and pattern are mutually exclusive");
        ((SubscriptionState) Mockito.doThrow(new Throwable[]{illegalStateException}).when(this.subscriptionState)).subscribe(subscriptionPattern, of);
        setupProcessor(true);
        this.processor.process(topicRe2JPatternSubscriptionChangeEvent);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).subscribe(subscriptionPattern, of);
        Assertions.assertEquals(illegalStateException, (Exception) TestUtils.assertFutureThrows(topicRe2JPatternSubscriptionChangeEvent.future(), illegalStateException.getClass()));
    }

    @Test
    public void testSyncCommitEventWithEmptyOffsets() {
        Map of = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        SyncCommitEvent syncCommitEvent = new SyncCommitEvent(Optional.empty(), 12345L);
        setupProcessor(true);
        ((SubscriptionState) Mockito.doReturn(of).when(this.subscriptionState)).allConsumed();
        ((CommitRequestManager) Mockito.doReturn(CompletableFuture.completedFuture(of)).when(this.commitRequestManager)).commitSync(of, 12345L);
        this.processor.process(syncCommitEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).commitSync(of, 12345L);
        Assertions.assertTrue(syncCommitEvent.offsetsReady.isDone());
        Assertions.assertEquals(of, (Map) Assertions.assertDoesNotThrow(() -> {
            return (Map) syncCommitEvent.future().get();
        }));
    }

    @Test
    public void testSyncCommitEvent() {
        Map of = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        SyncCommitEvent syncCommitEvent = new SyncCommitEvent(Optional.of(of), 12345L);
        setupProcessor(true);
        ((CommitRequestManager) Mockito.doReturn(CompletableFuture.completedFuture(of)).when(this.commitRequestManager)).commitSync(of, 12345L);
        this.processor.process(syncCommitEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).commitSync(of, 12345L);
        Assertions.assertTrue(syncCommitEvent.offsetsReady.isDone());
        Assertions.assertEquals(of, (Map) Assertions.assertDoesNotThrow(() -> {
            return (Map) syncCommitEvent.future().get();
        }));
    }

    @Test
    public void testSyncCommitEventWithoutCommitRequestManager() {
        SyncCommitEvent syncCommitEvent = new SyncCommitEvent(Optional.empty(), 12345L);
        setupProcessor(false);
        this.processor.process(syncCommitEvent);
        TestUtils.assertFutureThrows(syncCommitEvent.future(), KafkaException.class);
    }

    @Test
    public void testSyncCommitEventWithException() {
        SyncCommitEvent syncCommitEvent = new SyncCommitEvent(Optional.empty(), 12345L);
        setupProcessor(true);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new IllegalStateException());
        ((CommitRequestManager) Mockito.doReturn(completableFuture).when(this.commitRequestManager)).commitSync((Map) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        this.processor.process(syncCommitEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).commitSync(Collections.emptyMap(), 12345L);
        Assertions.assertTrue(syncCommitEvent.offsetsReady.isDone());
        TestUtils.assertFutureThrows(syncCommitEvent.future(), IllegalStateException.class);
    }

    @Test
    public void testAsyncCommitEventWithEmptyOffsets() {
        Map of = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(Optional.empty());
        setupProcessor(true);
        ((CommitRequestManager) Mockito.doReturn(CompletableFuture.completedFuture(of)).when(this.commitRequestManager)).commitAsync(of);
        ((SubscriptionState) Mockito.doReturn(of).when(this.subscriptionState)).allConsumed();
        this.processor.process(asyncCommitEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).commitAsync(of);
        Assertions.assertTrue(asyncCommitEvent.offsetsReady.isDone());
        Assertions.assertEquals(of, (Map) Assertions.assertDoesNotThrow(() -> {
            return (Map) asyncCommitEvent.future().get();
        }));
    }

    @Test
    public void testAsyncCommitEvent() {
        Map of = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(Optional.of(of));
        setupProcessor(true);
        ((CommitRequestManager) Mockito.doReturn(CompletableFuture.completedFuture(of)).when(this.commitRequestManager)).commitAsync(of);
        this.processor.process(asyncCommitEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).commitAsync(of);
        Assertions.assertTrue(asyncCommitEvent.offsetsReady.isDone());
        Assertions.assertEquals(of, (Map) Assertions.assertDoesNotThrow(() -> {
            return (Map) asyncCommitEvent.future().get();
        }));
    }

    @Test
    public void testAsyncCommitEventWithoutCommitRequestManager() {
        AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(Optional.empty());
        setupProcessor(false);
        this.processor.process(asyncCommitEvent);
        TestUtils.assertFutureThrows(asyncCommitEvent.future(), KafkaException.class);
    }

    @Test
    public void testAsyncCommitEventWithException() {
        AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(Optional.empty());
        setupProcessor(true);
        ((SubscriptionState) Mockito.doReturn(Collections.emptyMap()).when(this.subscriptionState)).allConsumed();
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new IllegalStateException());
        ((CommitRequestManager) Mockito.doReturn(completableFuture).when(this.commitRequestManager)).commitAsync((Map) ArgumentMatchers.any());
        this.processor.process(asyncCommitEvent);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).commitAsync(Collections.emptyMap());
        Assertions.assertTrue(asyncCommitEvent.offsetsReady.isDone());
        TestUtils.assertFutureThrows(asyncCommitEvent.future(), IllegalStateException.class);
    }

    private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
        return Collections.singletonList((NetworkClientDelegate.UnsentRequest) Mockito.mock(NetworkClientDelegate.UnsentRequest.class));
    }
}
