package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfigTest;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.class */
public class ShareConsumerImplTest {
    private ShareConsumerImpl<String, String> consumer = null;
    private final Time time = new MockTime(1);
    private final ShareFetchCollector<String, String> fetchCollector = (ShareFetchCollector) Mockito.mock(ShareFetchCollector.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata) Mockito.mock(ConsumerMetadata.class);
    private final ApplicationEventHandler applicationEventHandler = (ApplicationEventHandler) Mockito.mock(ApplicationEventHandler.class);
    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
    private final CompletableEventReaper backgroundEventReaper = (CompletableEventReaper) Mockito.mock(CompletableEventReaper.class);

    @AfterEach
    public void resetAll() {
        this.backgroundEventQueue.clear();
        if (this.consumer != null) {
            this.consumer.close(Duration.ZERO);
        }
        this.consumer = null;
        Mockito.framework().clearInlineMocks();
        MockConsumerInterceptor.resetCounters();
    }

    private ShareConsumerImpl<String, String> newConsumer() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", "group-id");
        return newConsumer(requiredConsumerProperties);
    }

    private ShareConsumerImpl<String, String> newConsumerWithEmptyGroupId() {
        return newConsumer(requiredConsumerPropertiesAndGroupId(""));
    }

    private ShareConsumerImpl<String, String> newConsumer(Properties properties) {
        return newConsumer(new ConsumerConfig(properties));
    }

    private ShareConsumerImpl<String, String> newConsumer(ConsumerConfig consumerConfig) {
        return new ShareConsumerImpl<>(consumerConfig, new StringDeserializer(), new StringDeserializer(), this.time, (logContext, time, blockingQueue, completableEventReaper, supplier, supplier2, supplier3, asyncConsumerMetrics) -> {
            return this.applicationEventHandler;
        }, logContext2 -> {
            return this.backgroundEventReaper;
        }, (logContext3, consumerMetadata, subscriptionState, fetchConfig, deserializers) -> {
            return this.fetchCollector;
        }, this.backgroundEventQueue);
    }

    private ShareConsumerImpl<String, String> newConsumer(SubscriptionState subscriptionState) {
        return newConsumer((ShareFetchBuffer) Mockito.mock(ShareFetchBuffer.class), subscriptionState, "group-id", "client-id");
    }

    private ShareConsumerImpl<String, String> newConsumer(ShareFetchBuffer shareFetchBuffer, SubscriptionState subscriptionState, String str, String str2) {
        return new ShareConsumerImpl<>(new LogContext(), str2, new StringDeserializer(), new StringDeserializer(), shareFetchBuffer, this.fetchCollector, this.time, this.applicationEventHandler, this.backgroundEventQueue, this.backgroundEventReaper, new Metrics(), subscriptionState, this.metadata, 1000, str);
    }

    @Test
    public void testSuccessfulStartupShutdown() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
        completeShareUnsubscribeApplicationEventSuccessfully(subscriptionState);
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.close();
        });
    }

    @Test
    public void testInvalidGroupId() {
        Assertions.assertInstanceOf(InvalidGroupIdException.class, Assertions.assertThrows(KafkaException.class, this::newConsumerWithEmptyGroupId).getCause());
    }

    @Test
    public void testFailConstructor() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", "group-id");
        requiredConsumerProperties.put(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties);
        KafkaException assertThrows = Assertions.assertThrows(KafkaException.class, () -> {
            newConsumer(consumerConfig);
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("Failed to construct Kafka share consumer"), "Unexpected exception message: " + assertThrows.getMessage());
        Assertions.assertTrue(assertThrows.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + String.valueOf(assertThrows.getCause()));
    }

    @Test
    public void testWakeupBeforeCallingPoll() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        ((ShareFetchCollector) Mockito.doReturn(ShareFetch.empty()).when(this.fetchCollector)).collect((ShareFetchBuffer) ArgumentMatchers.any(ShareFetchBuffer.class));
        List<String> singletonList = Collections.singletonList("foo");
        completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptionState, singletonList);
        this.consumer.subscribe(singletonList);
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testWakeupAfterEmptyFetch() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        ((ShareFetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.wakeup();
            return ShareFetch.empty();
        }).doAnswer(invocationOnMock2 -> {
            return ShareFetch.empty();
        }).when(this.fetchCollector)).collect((ShareFetchBuffer) ArgumentMatchers.any(ShareFetchBuffer.class));
        List<String> singletonList = Collections.singletonList("foo");
        completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptionState, singletonList);
        this.consumer.subscribe(singletonList);
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ofMinutes(1L));
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testWakeupAfterNonEmptyFetch() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 3, "foo");
        ShareInFlightBatch shareInFlightBatch = new ShareInFlightBatch(topicIdPartition);
        shareInFlightBatch.addRecord(new ConsumerRecord("foo", 3, 2L, "key1", "value1"));
        ((ShareFetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.wakeup();
            ShareFetch empty = ShareFetch.empty();
            empty.add(topicIdPartition, shareInFlightBatch);
            return empty;
        }).when(this.fetchCollector)).collect((ShareFetchBuffer) Mockito.any(ShareFetchBuffer.class));
        List<String> singletonList = Collections.singletonList("foo");
        completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptionState, singletonList);
        this.consumer.subscribe(singletonList);
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ofMinutes(1L));
        });
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testFailOnClosedConsumer() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
        completeShareUnsubscribeApplicationEventSuccessfully(subscriptionState);
        this.consumer.close();
        ShareConsumerImpl<String, String> shareConsumerImpl = this.consumer;
        Objects.requireNonNull(shareConsumerImpl);
        Assertions.assertEquals("This consumer has already been closed.", ((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, shareConsumerImpl::subscription)).getMessage());
    }

    @Test
    public void testVerifyApplicationEventOnShutdown() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
        completeShareUnsubscribeApplicationEventSuccessfully(subscriptionState);
        this.consumer.close();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ShareAcknowledgeOnCloseEvent.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(ShareUnsubscribeEvent.class));
    }

    @Test
    public void testAcknowledgementCommitCallbackRegistrationEvent() {
        this.consumer = newConsumer();
        AcknowledgementCommitCallback acknowledgementCommitCallback = (AcknowledgementCommitCallback) Mockito.mock(AcknowledgementCommitCallback.class);
        this.consumer.setAcknowledgementCommitCallback(acknowledgementCommitCallback);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.argThat(applicationEvent -> {
            return (applicationEvent instanceof ShareAcknowledgementCommitCallbackRegistrationEvent) && ((ShareAcknowledgementCommitCallbackRegistrationEvent) applicationEvent).isCallbackRegistered();
        }));
        this.consumer.setAcknowledgementCommitCallback(acknowledgementCommitCallback);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.times(1))).add((ApplicationEvent) ArgumentMatchers.any(ShareAcknowledgementCommitCallbackRegistrationEvent.class));
    }

    @Test
    public void testAcknowledgementCommitCallbackRegistrationEvent_Null() {
        this.consumer = newConsumer();
        AcknowledgementCommitCallback acknowledgementCommitCallback = (AcknowledgementCommitCallback) Mockito.mock(AcknowledgementCommitCallback.class);
        this.consumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback) null);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.times(0))).add((ApplicationEvent) ArgumentMatchers.any(ShareAcknowledgementCommitCallbackRegistrationEvent.class));
        this.consumer.setAcknowledgementCommitCallback(acknowledgementCommitCallback);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.times(1))).add((ApplicationEvent) ArgumentMatchers.any(ShareAcknowledgementCommitCallbackRegistrationEvent.class));
        this.consumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback) null);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.argThat(applicationEvent -> {
            return (applicationEvent instanceof ShareAcknowledgementCommitCallbackRegistrationEvent) && !((ShareAcknowledgementCommitCallbackRegistrationEvent) applicationEvent).isCallbackRegistered();
        }));
    }

    @Test
    public void testCompleteQuietly() {
        AtomicReference atomicReference = new AtomicReference();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        this.consumer = newConsumer();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.completeQuietly(() -> {
                completedFuture.get(0L, TimeUnit.MILLISECONDS);
            }, "test", atomicReference);
        });
        Assertions.assertNull(atomicReference.get());
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.completeQuietly(() -> {
                throw new KafkaException("Test exception");
            }, "test", atomicReference);
        });
        Assertions.assertInstanceOf(KafkaException.class, atomicReference.get());
    }

    @Test
    public void testSubscribeGeneratesEvent() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        List<String> singletonList = Collections.singletonList("topic1");
        completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptionState, singletonList);
        this.consumer.subscribe(singletonList);
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ShareSubscriptionChangeEvent.class));
    }

    @Test
    public void testUnsubscribeGeneratesUnsubscribeEvent() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        completeShareUnsubscribeApplicationEventSuccessfully(subscriptionState);
        this.consumer.unsubscribe();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
    }

    @Test
    public void testSubscribeToEmptyListActsAsUnsubscribe() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        completeShareUnsubscribeApplicationEventSuccessfully(subscriptionState);
        this.consumer.subscribe(Collections.emptyList());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
    }

    @Test
    public void testSubscribeToNullTopicCollection() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe((Collection) null);
        });
    }

    @Test
    public void testSubscriptionOnNullTopic() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(null));
        });
    }

    @Test
    public void testSubscriptionOnEmptyTopic() {
        this.consumer = newConsumer();
        String str = "  ";
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(str));
        });
    }

    @Test
    public void testBackgroundError() {
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("shareGroupA")));
        KafkaException kafkaException = new KafkaException("Nobody expects the Spanish Inquisition");
        this.backgroundEventQueue.add(new ErrorEvent(kafkaException));
        this.consumer.subscribe(Collections.singletonList("t1"));
        Assertions.assertEquals(kafkaException.getMessage(), Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        }).getMessage());
    }

    @Test
    public void testMultipleBackgroundErrors() {
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("shareGroupA")));
        KafkaException kafkaException = new KafkaException("Nobody expects the Spanish Inquisition");
        this.backgroundEventQueue.add(new ErrorEvent(kafkaException));
        this.backgroundEventQueue.add(new ErrorEvent(new KafkaException("Spam, Spam, Spam")));
        this.consumer.subscribe(Collections.singletonList("t1"));
        Assertions.assertEquals(kafkaException.getMessage(), Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        }).getMessage());
        Assertions.assertTrue(this.backgroundEventQueue.isEmpty());
    }

    @Test
    public void testGroupIdNull() {
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties());
        Assertions.assertEquals("Failed to construct Kafka share consumer", ((Exception) Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer = newConsumer(consumerConfig);
        })).getMessage());
    }

    @Test
    public void testGroupIdEmpty() {
        testInvalidGroupId("");
    }

    @Test
    public void testGroupIdOnlyWhitespaces() {
        testInvalidGroupId("       ");
    }

    private void testInvalidGroupId(String str) {
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(str));
        Assertions.assertEquals("Failed to construct Kafka share consumer", ((Exception) Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer = newConsumer(consumerConfig);
        })).getMessage());
    }

    @Test
    public void testEnsurePollEventSentOnConsumerPoll() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = newConsumer(subscriptionState);
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic", 0));
        ShareInFlightBatch shareInFlightBatch = new ShareInFlightBatch(topicIdPartition);
        shareInFlightBatch.addRecord(new ConsumerRecord("topic", 0, 2L, "key1", "value1"));
        ShareFetch empty = ShareFetch.empty();
        empty.add(topicIdPartition, shareInFlightBatch);
        ((ShareFetchCollector) Mockito.doAnswer(invocationOnMock -> {
            return empty;
        }).when(this.fetchCollector)).collect((ShareFetchBuffer) Mockito.any(ShareFetchBuffer.class));
        List<String> singletonList = Collections.singletonList("topic");
        completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptionState, singletonList);
        this.consumer.subscribe(singletonList);
        this.consumer.poll(Duration.ofMillis(100L));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(PollEvent.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ShareSubscriptionChangeEvent.class));
        completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
        completeShareUnsubscribeApplicationEventSuccessfully(subscriptionState);
        this.consumer.close();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ShareAcknowledgeOnCloseEvent.class));
    }

    private Properties requiredConsumerPropertiesAndGroupId(String str) {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", str);
        return requiredConsumerProperties;
    }

    private Properties requiredConsumerProperties() {
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("bootstrap.servers", "localhost:9091");
        return properties;
    }

    @Test
    public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
        this.consumer = newConsumer();
        Timer timer = new MockTime().timer(1000L);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        ((CompletableFuture) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            if (countDownLatch.getCount() > 0) {
                timer.sleep(((Long) invocationOnMock.getArgument(0, Long.class)).longValue());
                throw new TimeoutException("Intentional timeout");
            }
            completableFuture.complete(null);
            return null;
        }).when(completableFuture)).get(((Long) ArgumentMatchers.any(Long.class)).longValue(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        this.consumer.processBackgroundEvents(completableFuture, timer);
        Assertions.assertEquals(800L, timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsWithoutDelay() {
        this.consumer = newConsumer();
        Timer timer = new MockTime().timer(1000L);
        this.consumer.processBackgroundEvents(CompletableFuture.completedFuture(null), timer);
        Assertions.assertEquals(1000L, timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsTimesOut() throws Exception {
        this.consumer = newConsumer();
        Timer timer = new MockTime().timer(1000L);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doAnswer(invocationOnMock -> {
            timer.sleep(((Long) invocationOnMock.getArgument(0, Long.class)).longValue());
            throw new TimeoutException("Intentional timeout");
        }).when(completableFuture)).get(((Long) ArgumentMatchers.any(Long.class)).longValue(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            this.consumer.processBackgroundEvents(completableFuture, timer);
        });
        Assertions.assertEquals(0L, timer.remainingMs());
    }

    private void completeShareSubscriptionChangeApplicationEventSuccessfully(SubscriptionState subscriptionState, List<String> list) {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ShareSubscriptionChangeEvent shareSubscriptionChangeEvent = (ShareSubscriptionChangeEvent) invocationOnMock.getArgument(0);
            subscriptionState.subscribeToShareGroup(new HashSet(list));
            shareSubscriptionChangeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ShareSubscriptionChangeEvent.class));
    }

    private void completeShareUnsubscribeApplicationEventSuccessfully(SubscriptionState subscriptionState) {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ShareUnsubscribeEvent shareUnsubscribeEvent = (ShareUnsubscribeEvent) invocationOnMock.getArgument(0);
            subscriptionState.unsubscribe();
            shareUnsubscribeEvent.future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
    }

    private void completeShareAcknowledgeOnCloseApplicationEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((ShareAcknowledgeOnCloseEvent) invocationOnMock.getArgument(0)).future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ShareAcknowledgeOnCloseEvent.class));
    }
}
