package kafka.test.api;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import kafka.api.BaseConsumerTest;
import kafka.server.BrokerServer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
@Timeout(1200)
/* loaded from: input_file:kafka/test/api/ShareConsumerTest.class */
public class ShareConsumerTest {
    private KafkaClusterTestKit cluster;
    private final TopicPartition tp = new TopicPartition("topic", 0);
    private final TopicPartition tp2 = new TopicPartition("topic2", 0);
    private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
    private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister";
    private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister";
    private Admin adminClient;

    /* loaded from: input_file:kafka/test/api/ShareConsumerTest$TestableAcknowledgementCommitCallback.class */
    private static class TestableAcknowledgementCommitCallback implements AcknowledgementCommitCallback {
        private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
        private final Map<TopicPartition, Exception> partitionExceptionMap;

        public TestableAcknowledgementCommitCallback(Map<TopicPartition, Set<Long>> map, Map<TopicPartition, Exception> map2) {
            this.partitionOffsetsMap = map;
            this.partitionExceptionMap = map2;
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> map, Exception exc) {
            map.forEach((topicIdPartition, set) -> {
                this.partitionOffsetsMap.merge(topicIdPartition.topicPartition(), set, (set, set2) -> {
                    HashSet hashSet = new HashSet();
                    hashSet.addAll(set);
                    hashSet.addAll(set2);
                    return hashSet;
                });
                if (this.partitionExceptionMap.containsKey(topicIdPartition.topicPartition())) {
                    return;
                }
                this.partitionExceptionMap.put(topicIdPartition.topicPartition(), exc);
            });
        }
    }

    /* loaded from: input_file:kafka/test/api/ShareConsumerTest$TestableAcknowledgementCommitCallbackThrows.class */
    private static class TestableAcknowledgementCommitCallbackThrows<K, V> implements AcknowledgementCommitCallback {
        private TestableAcknowledgementCommitCallbackThrows() {
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> map, Exception exc) {
            throw new OutOfOrderSequenceException("Exception thrown in TestableAcknowledgementCommitCallbackThrows.onComplete");
        }
    }

    /* loaded from: input_file:kafka/test/api/ShareConsumerTest$TestableAcknowledgementCommitCallbackWakeup.class */
    private static class TestableAcknowledgementCommitCallbackWakeup<K, V> implements AcknowledgementCommitCallback {
        private final KafkaShareConsumer<K, V> shareConsumer;

        TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V> kafkaShareConsumer) {
            this.shareConsumer = kafkaShareConsumer;
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> map, Exception exc) {
            this.shareConsumer.wakeup();
        }
    }

    /* loaded from: input_file:kafka/test/api/ShareConsumerTest$TestableAcknowledgementCommitCallbackWithShareConsumer.class */
    private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V> implements AcknowledgementCommitCallback {
        private final KafkaShareConsumer<K, V> shareConsumer;

        TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> kafkaShareConsumer) {
            this.shareConsumer = kafkaShareConsumer;
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> map, Exception exc) {
            KafkaShareConsumer<K, V> kafkaShareConsumer = this.shareConsumer;
            Objects.requireNonNull(kafkaShareConsumer);
            Assertions.assertThrows(IllegalStateException.class, kafkaShareConsumer::close);
            Assertions.assertThrows(IllegalStateException.class, () -> {
                this.shareConsumer.subscribe(Collections.singleton(ShareConsumerTest.this.tp.topic()));
            });
            Assertions.assertThrows(IllegalStateException.class, () -> {
                this.shareConsumer.poll(Duration.ofMillis(5000L));
            });
        }
    }

    @BeforeEach
    public void createCluster(TestInfo testInfo) throws Exception {
        String str = NO_OP_PERSISTER;
        if (testInfo.getDisplayName().contains(".persister=")) {
            str = testInfo.getDisplayName().split("=")[1];
        }
        this.cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumBrokerNodes(1).setNumControllerNodes(1).build()).setConfigProp("auto.create.topics.enable", "false").setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share").setConfigProp("group.share.enable", "true").setConfigProp("group.share.partition.max.record.locks", "10000").setConfigProp("group.share.persister.class.name", str).setConfigProp("group.share.record.lock.duration.ms", "15000").setConfigProp("offsets.topic.replication.factor", "1").setConfigProp("share.coordinator.state.topic.min.isr", "1").setConfigProp("share.coordinator.state.topic.num.partitions", "3").setConfigProp("share.coordinator.state.topic.replication.factor", "1").setConfigProp("transaction.state.log.min.isr", "1").setConfigProp("transaction.state.log.replication.factor", "1").setConfigProp("unstable.api.versions.enable", "true").build();
        this.cluster.format();
        this.cluster.startup();
        this.cluster.waitForActiveController();
        this.cluster.waitForReadyBrokers();
        createTopic("topic");
        createTopic("topic2");
        this.adminClient = createAdminClient();
        warmup();
    }

    @AfterEach
    public void destroyCluster() throws Exception {
        this.adminClient.close();
        this.cluster.close();
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testPollNoSubscribeFails(String str) {
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            Assertions.assertEquals(Collections.emptySet(), createShareConsumer.subscription());
            Assertions.assertThrows(IllegalStateException.class, () -> {
                createShareConsumer.poll(Duration.ofMillis(500L));
            });
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscribeAndPollNoRecords(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            Set singleton = Collections.singleton(this.tp.topic());
            createShareConsumer.subscribe(singleton);
            Assertions.assertEquals(singleton, createShareConsumer.subscription());
            Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscribePollUnsubscribe(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            Set singleton = Collections.singleton(this.tp.topic());
            createShareConsumer.subscribe(singleton);
            Assertions.assertEquals(singleton, createShareConsumer.subscription());
            ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(500L));
            createShareConsumer.unsubscribe();
            Assertions.assertEquals(Collections.emptySet(), createShareConsumer.subscription());
            Assertions.assertEquals(0, poll.count());
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscribePollSubscribe(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            Set singleton = Collections.singleton(this.tp.topic());
            createShareConsumer.subscribe(singleton);
            Assertions.assertEquals(singleton, createShareConsumer.subscription());
            Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
            createShareConsumer.subscribe(singleton);
            Assertions.assertEquals(singleton, createShareConsumer.subscription());
            Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscribeUnsubscribePollFails(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            Set singleton = Collections.singleton(this.tp.topic());
            createShareConsumer.subscribe(singleton);
            Assertions.assertEquals(singleton, createShareConsumer.subscription());
            ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(500L));
            createShareConsumer.unsubscribe();
            Assertions.assertEquals(Collections.emptySet(), createShareConsumer.subscription());
            Assertions.assertThrows(IllegalStateException.class, () -> {
                createShareConsumer.poll(Duration.ofMillis(500L));
            });
            Assertions.assertEquals(0, poll.count());
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscribeSubscribeEmptyPollFails(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            Set singleton = Collections.singleton(this.tp.topic());
            createShareConsumer.subscribe(singleton);
            Assertions.assertEquals(singleton, createShareConsumer.subscription());
            ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(500L));
            createShareConsumer.subscribe(Collections.emptySet());
            Assertions.assertEquals(Collections.emptySet(), createShareConsumer.subscription());
            Assertions.assertThrows(IllegalStateException.class, () -> {
                createShareConsumer.poll(Duration.ofMillis(500L));
            });
            Assertions.assertEquals(0, poll.count());
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscriptionAndPoll(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscriptionAndPollMultiple(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createProducer.send(producerRecord);
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createProducer.send(producerRecord);
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementSentOnSubscriptionChange(String str) throws ExecutionException, InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.send(new ProducerRecord(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), (Long) null, "key".getBytes(), "value".getBytes())).get();
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "Failed to consume records for share consumer";
                });
                createShareConsumer.subscribe(Collections.singletonList(this.tp2.topic()));
                TestUtils.waitForCondition(() -> {
                    createShareConsumer.poll(Duration.ofMillis(500L));
                    return hashMap2.containsKey(this.tp) && hashMap2.containsKey(this.tp2);
                }, 15000L, 100L, () -> {
                    return "Failed to consume records from the updated subscription";
                });
                Assertions.assertNull(hashMap2.get(this.tp));
                Assertions.assertNull(hashMap2.get(this.tp2));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String str) throws Exception {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "Failed to consume records for share consumer";
                });
                TestUtils.waitForCondition(() -> {
                    createShareConsumer.poll(Duration.ofMillis(500L));
                    return hashMap2.containsKey(this.tp);
                }, 15000L, 100L, () -> {
                    return "Failed to receive call to callback";
                });
                Assertions.assertNull(hashMap2.get(this.tp));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementCommitCallbackOnClose(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createShareConsumer.poll(Duration.ofMillis(1000L));
                createShareConsumer.close();
                Assertions.assertTrue(hashMap2.containsKey(this.tp));
                Assertions.assertNull(hashMap2.get(this.tp));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementCommitCallbackInvalidRecordStateException(String str) throws Exception {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                Thread.sleep(20000L);
                TestUtils.waitForCondition(() -> {
                    createShareConsumer.poll(Duration.ofMillis(500L));
                    return hashMap2.containsKey(this.tp) && (hashMap2.get(this.tp) instanceof InvalidRecordStateException);
                }, 15000L, 100L, () -> {
                    return "Failed to be notified by InvalidRecordStateException";
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testHeaders(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                producerRecord.headers().add("headerKey", "headerValue".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                List consumeRecords = consumeRecords(createShareConsumer, 1);
                Assertions.assertEquals(1, consumeRecords.size());
                Iterator it = consumeRecords.iterator();
                while (it.hasNext()) {
                    Header lastHeader = ((ConsumerRecord) it.next()).headers().lastHeader("headerKey");
                    if (lastHeader != null) {
                        Assertions.assertEquals("headerValue", new String(lastHeader.value()));
                    }
                }
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } catch (Throwable th) {
                if (createShareConsumer != null) {
                    try {
                        createShareConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void testHeadersSerializeDeserialize(Serializer<byte[]> serializer, Deserializer<byte[]> deserializer) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), serializer);
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, consumeRecords(createShareConsumer, 1).size());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testHeadersSerializerDeserializer(String str) {
        testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl());
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testMaxPollRecords(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1", Collections.singletonMap("max.poll.records", String.valueOf(2)));
        try {
            long currentTimeMillis = System.currentTimeMillis();
            produceMessagesWithTimestamp(10000, currentTimeMillis);
            createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            long j = 0;
            for (ConsumerRecord consumerRecord : consumeRecords(createShareConsumer, 10000)) {
                Assertions.assertEquals(this.tp.topic(), consumerRecord.topic());
                Assertions.assertEquals(this.tp.partition(), consumerRecord.partition());
                Assertions.assertEquals(TimestampType.CREATE_TIME, consumerRecord.timestampType());
                Assertions.assertEquals(currentTimeMillis + j, consumerRecord.timestamp());
                Assertions.assertEquals("key " + j, new String((byte[]) consumerRecord.key()));
                Assertions.assertEquals("value " + j, new String((byte[]) consumerRecord.value()));
                Assertions.assertEquals(("key " + j).length(), consumerRecord.serializedKeySize());
                Assertions.assertEquals(("value " + j).length(), consumerRecord.serializedValueSize());
                j++;
            }
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testControlRecordsSkipped(String str) throws Exception {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1");
        try {
            KafkaProducer createProducer2 = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
            try {
                KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
                try {
                    ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    createProducer.initTransactions();
                    createProducer.beginTransaction();
                    RecordMetadata recordMetadata = (RecordMetadata) createProducer.send(producerRecord).get();
                    RecordMetadata recordMetadata2 = (RecordMetadata) createProducer2.send(producerRecord).get();
                    createProducer.commitTransaction();
                    createProducer.beginTransaction();
                    RecordMetadata recordMetadata3 = (RecordMetadata) createProducer.send(producerRecord).get();
                    createProducer.abortTransaction();
                    RecordMetadata recordMetadata4 = (RecordMetadata) createProducer2.send(producerRecord).get();
                    createProducer.close();
                    createProducer2.close();
                    createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                    ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                    Assertions.assertEquals(4, poll.count());
                    Assertions.assertEquals(recordMetadata.offset(), ((ConsumerRecord) poll.records(this.tp).get(0)).offset());
                    Assertions.assertEquals(recordMetadata2.offset(), ((ConsumerRecord) poll.records(this.tp).get(1)).offset());
                    Assertions.assertEquals(recordMetadata3.offset(), ((ConsumerRecord) poll.records(this.tp).get(2)).offset());
                    Assertions.assertEquals(recordMetadata4.offset(), ((ConsumerRecord) poll.records(this.tp).get(3)).offset());
                    Assertions.assertNotEquals(3L, recordMetadata4.offset());
                    Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                    if (createShareConsumer != null) {
                        createShareConsumer.close();
                    }
                    if (createProducer2 != null) {
                        createProducer2.close();
                    }
                    if (createProducer != null) {
                        createProducer.close();
                    }
                } catch (Throwable th) {
                    if (createShareConsumer != null) {
                        try {
                            createShareConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgeSuccess(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                Objects.requireNonNull(createShareConsumer);
                poll.forEach(createShareConsumer::acknowledge);
                createProducer.send(producerRecord);
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgeCommitSuccess(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                Objects.requireNonNull(createShareConsumer);
                poll.forEach(createShareConsumer::acknowledge);
                createProducer.send(producerRecord);
                Assertions.assertEquals(1, createShareConsumer.commitSync().size());
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgementCommitAsync(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                KafkaShareConsumer createShareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
                try {
                    ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    ProducerRecord producerRecord2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    ProducerRecord producerRecord3 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    createProducer.send(producerRecord);
                    createProducer.send(producerRecord2);
                    createProducer.send(producerRecord3);
                    createProducer.flush();
                    createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                    createShareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
                    HashMap hashMap = new HashMap();
                    HashMap hashMap2 = new HashMap();
                    createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                    ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                    Assertions.assertEquals(3, poll.count());
                    Iterator it = poll.iterator();
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                    Assertions.assertEquals(0L, consumerRecord.offset());
                    Assertions.assertEquals(1L, consumerRecord2.offset());
                    createShareConsumer.acknowledge(consumerRecord);
                    createShareConsumer.acknowledge(consumerRecord2);
                    createShareConsumer.commitAsync();
                    TestUtils.waitForCondition(() -> {
                        ConsumerRecords poll2 = createShareConsumer2.poll(Duration.ofMillis(1000L));
                        return poll2.count() == 1 && ((ConsumerRecord) poll2.iterator().next()).offset() == 2;
                    }, 30000L, 100L, () -> {
                        return "Didn't receive timed out record";
                    });
                    Assertions.assertFalse(hashMap2.containsKey(this.tp));
                    TestUtils.waitForCondition(() -> {
                        createShareConsumer.poll(Duration.ofMillis(1000L));
                        return hashMap2.containsKey(this.tp);
                    }, 30000L, 100L, () -> {
                        return "Didn't receive call to callback";
                    });
                    Assertions.assertNull(hashMap2.get(this.tp));
                    if (createShareConsumer2 != null) {
                        createShareConsumer2.close();
                    }
                    if (createShareConsumer != null) {
                        createShareConsumer.close();
                    }
                    if (createProducer != null) {
                        createProducer.close();
                    }
                } catch (Throwable th) {
                    if (createShareConsumer2 != null) {
                        try {
                            createShareConsumer2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createShareConsumer.commitAsync();
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                createShareConsumer.acknowledge((ConsumerRecord) poll.iterator().next());
                createShareConsumer.commitAsync();
                TestUtils.waitForCondition(() -> {
                    createShareConsumer.poll(Duration.ofMillis(500L));
                    return hashMap2.containsKey(this.tp);
                }, 30000L, 100L, () -> {
                    return "Didn't receive call to callback";
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgementCommitAsyncPartialBatch(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                ProducerRecord producerRecord2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                ProducerRecord producerRecord3 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.send(producerRecord2);
                createProducer.send(producerRecord3);
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(3, poll.count());
                Iterator it = poll.iterator();
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                Assertions.assertEquals(0L, consumerRecord.offset());
                Assertions.assertEquals(1L, consumerRecord2.offset());
                createShareConsumer.acknowledge(consumerRecord);
                createShareConsumer.acknowledge(consumerRecord2);
                createShareConsumer.commitAsync();
                ConsumerRecords poll2 = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll2.count());
                Assertions.assertEquals(2L, ((ConsumerRecord) poll2.iterator().next()).offset());
                ConsumerRecords poll3 = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll3.count());
                ConsumerRecord consumerRecord3 = (ConsumerRecord) poll3.iterator().next();
                Assertions.assertEquals(2L, consumerRecord3.offset());
                createShareConsumer.acknowledge(consumerRecord3);
                createShareConsumer.poll(Duration.ofMillis(500L));
                createShareConsumer.close();
                Assertions.assertTrue(hashMap2.containsKey(this.tp));
                Assertions.assertNull(hashMap2.get(this.tp));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgeReleasePollAccept(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                poll.forEach(consumerRecord -> {
                    createShareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE);
                });
                ConsumerRecords poll2 = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll2.count());
                poll2.forEach(consumerRecord2 -> {
                    createShareConsumer.acknowledge(consumerRecord2, AcknowledgeType.ACCEPT);
                });
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgeReleaseAccept(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                poll.forEach(consumerRecord -> {
                    createShareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE);
                });
                poll.forEach(consumerRecord2 -> {
                    createShareConsumer.acknowledge(consumerRecord2, AcknowledgeType.ACCEPT);
                });
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgeReleaseClose(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                poll.forEach(consumerRecord -> {
                    createShareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE);
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testExplicitAcknowledgeThrowsNotInBatch(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                ConsumerRecord consumerRecord = (ConsumerRecord) poll.records(this.tp).get(0);
                createShareConsumer.acknowledge(consumerRecord);
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                Assertions.assertThrows(IllegalStateException.class, () -> {
                    createShareConsumer.acknowledge(consumerRecord);
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testImplicitAcknowledgeFailsExplicit(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                Assertions.assertEquals(1, poll.count());
                ConsumerRecord consumerRecord = (ConsumerRecord) poll.records(this.tp).get(0);
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                Assertions.assertThrows(IllegalStateException.class, () -> {
                    createShareConsumer.acknowledge(consumerRecord);
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testImplicitAcknowledgeCommitSync(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                Assertions.assertEquals(1, createShareConsumer.commitSync().size());
                Assertions.assertEquals(0, createShareConsumer.commitSync().size());
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testImplicitAcknowledgementCommitAsync(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                ProducerRecord producerRecord2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                ProducerRecord producerRecord3 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.send(producerRecord2);
                createProducer.send(producerRecord3);
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                HashMap hashMap = new HashMap();
                HashMap hashMap2 = new HashMap();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(hashMap, hashMap2));
                Assertions.assertEquals(3, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createShareConsumer.commitAsync();
                Assertions.assertFalse(hashMap2.containsKey(this.tp));
                TestUtils.waitForCondition(() -> {
                    createShareConsumer.poll(Duration.ofMillis(1000L));
                    return hashMap2.containsKey(this.tp);
                }, 15000L, 100L, () -> {
                    return "Acknowledgement commit callback did not receive the response yet";
                });
                Assertions.assertNull(hashMap2.get(this.tp));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testFetchRecordLargerThanMaxPartitionFetchBytes(String str) throws Exception {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1", Collections.singletonMap("max.partition.fetch.bytes", String.valueOf(10000)));
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                ProducerRecord producerRecord2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), new byte[10000]);
                createProducer.send(producerRecord).get();
                createProducer.send(producerRecord2).get();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testMultipleConsumersWithDifferentGroupIds(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        alterShareAutoOffsetReset("group2", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                KafkaShareConsumer createShareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
                try {
                    ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                    createShareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
                    createProducer.send(producerRecord);
                    createProducer.send(producerRecord);
                    createProducer.send(producerRecord);
                    createProducer.flush();
                    AtomicInteger atomicInteger = new AtomicInteger();
                    AtomicInteger atomicInteger2 = new AtomicInteger();
                    TestUtils.waitForCondition(() -> {
                        return atomicInteger.addAndGet(createShareConsumer.poll(Duration.ofMillis(2000L)).count()) == 3 && atomicInteger2.addAndGet(createShareConsumer2.poll(Duration.ofMillis(2000L)).count()) == 3;
                    }, 15000L, 100L, () -> {
                        return "Failed to consume records for both consumers";
                    });
                    createProducer.send(producerRecord);
                    createProducer.send(producerRecord);
                    atomicInteger.set(0);
                    TestUtils.waitForCondition(() -> {
                        return atomicInteger.addAndGet(createShareConsumer.poll(Duration.ofMillis(2000L)).count()) == 2;
                    }, 15000L, 100L, () -> {
                        return "Failed to consume records for share consumer 1";
                    });
                    createProducer.send(producerRecord);
                    createProducer.send(producerRecord);
                    createProducer.send(producerRecord);
                    atomicInteger.set(0);
                    atomicInteger2.set(0);
                    TestUtils.waitForCondition(() -> {
                        return atomicInteger.addAndGet(createShareConsumer.poll(Duration.ofMillis(2000L)).count()) == 3 && atomicInteger2.addAndGet(createShareConsumer2.poll(Duration.ofMillis(2000L)).count()) == 5;
                    }, 15000L, 100L, () -> {
                        return "Failed to consume records for both consumers for the last batch";
                    });
                    if (createShareConsumer2 != null) {
                        createShareConsumer2.close();
                    }
                    if (createShareConsumer != null) {
                        createShareConsumer.close();
                    }
                    if (createProducer != null) {
                        createProducer.close();
                    }
                } catch (Throwable th) {
                    if (createShareConsumer2 != null) {
                        try {
                            createShareConsumer2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testMultipleConsumersInGroupSequentialConsumption(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                KafkaShareConsumer createShareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
                try {
                    ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                    createShareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
                    for (int i = 0; i < 2000; i++) {
                        createProducer.send(producerRecord);
                    }
                    createProducer.flush();
                    int i2 = 0;
                    int i3 = 0;
                    for (int i4 = 0; i4 < 10; i4++) {
                        ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(2000L));
                        i2 += poll.count();
                        ConsumerRecords poll2 = createShareConsumer2.poll(Duration.ofMillis(2000L));
                        i3 += poll2.count();
                        if (poll.count() + poll2.count() == 0) {
                            break;
                        }
                    }
                    Assertions.assertEquals(2000, i2 + i3);
                    if (createShareConsumer2 != null) {
                        createShareConsumer2.close();
                    }
                    if (createShareConsumer != null) {
                        createShareConsumer.close();
                    }
                    if (createProducer != null) {
                        createProducer.close();
                    }
                } catch (Throwable th) {
                    if (createShareConsumer2 != null) {
                        try {
                            createShareConsumer2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testMultipleConsumersInGroupConcurrentConsumption(String str) throws InterruptedException, ExecutionException, TimeoutException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 4;
        int i2 = 5000;
        String str2 = "group1";
        alterShareAutoOffsetReset("group1", "earliest");
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < 4; i3++) {
            arrayList.add(CompletableFuture.runAsync(() -> {
                produceMessages(i2);
            }));
        }
        int i4 = 100000;
        ArrayList arrayList2 = new ArrayList();
        for (int i5 = 0; i5 < 4; i5++) {
            int i6 = i5 + 1;
            arrayList2.add(CompletableFuture.supplyAsync(() -> {
                return Integer.valueOf(consumeMessages(atomicInteger, i * i2, str2, i6, 30, true, i4));
            }));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i7 -> {
            return new CompletableFuture[i7];
        })).get(60L, TimeUnit.SECONDS);
        CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(i8 -> {
            return new CompletableFuture[i8];
        })).get(60L, TimeUnit.SECONDS);
        Assertions.assertEquals(4 * 5000, arrayList2.stream().mapToInt((v0) -> {
            return v0.join();
        }).sum());
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String str) throws ExecutionException, InterruptedException, TimeoutException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        int i = 2000;
        int i2 = 4 * 2000;
        alterShareAutoOffsetReset("group1", "earliest");
        alterShareAutoOffsetReset("group2", "earliest");
        alterShareAutoOffsetReset("group3", "earliest");
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < 4; i3++) {
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                return Integer.valueOf(produceMessages(i));
            }));
        }
        Assertions.assertDoesNotThrow(() -> {
            return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i4 -> {
                return new CompletableFuture[i4];
            })).get(15L, TimeUnit.SECONDS);
        }, "Exception awaiting produceMessages");
        int sum = arrayList.stream().mapToInt((v0) -> {
            return v0.join();
        }).sum();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        int i4 = 100000;
        for (int i5 = 0; i5 < 2; i5++) {
            int i6 = i5 + 1;
            arrayList2.add(CompletableFuture.supplyAsync(() -> {
                return Integer.valueOf(consumeMessages(atomicInteger, i2, "group1", i6, 100, true, i4));
            }));
            arrayList3.add(CompletableFuture.supplyAsync(() -> {
                return Integer.valueOf(consumeMessages(atomicInteger2, i2, "group2", i6, 100, true, i4));
            }));
            arrayList4.add(CompletableFuture.supplyAsync(() -> {
                return Integer.valueOf(consumeMessages(atomicInteger3, i2, "group3", i6, 100, true, i4));
            }));
        }
        CompletableFuture.allOf((CompletableFuture[]) Stream.of((Object[]) new Stream[]{arrayList2.stream(), arrayList3.stream(), arrayList4.stream()}).flatMap(Function.identity()).toArray(i7 -> {
            return new CompletableFuture[i7];
        })).get(120L, TimeUnit.SECONDS);
        int sum2 = arrayList2.stream().mapToInt((v0) -> {
            return v0.join();
        }).sum();
        int sum3 = arrayList3.stream().mapToInt((v0) -> {
            return v0.join();
        }).sum();
        int sum4 = arrayList4.stream().mapToInt((v0) -> {
            return v0.join();
        }).sum();
        Assertions.assertEquals(i2, sum2);
        Assertions.assertEquals(i2, sum3);
        Assertions.assertEquals(i2, sum4);
        Assertions.assertEquals(i2, sum);
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testConsumerCloseInGroupSequential(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                KafkaShareConsumer createShareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
                try {
                    ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                    createShareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
                    for (int i = 0; i < 1500; i++) {
                        createProducer.send(producerRecord);
                    }
                    createProducer.close();
                    int i2 = 0;
                    ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                    int count = 0 + poll.count();
                    int count2 = poll.count();
                    ConsumerRecords poll2 = createShareConsumer.poll(Duration.ofMillis(5000L));
                    int count3 = count + poll2.count();
                    Assertions.assertEquals(1500, count2 + poll2.count() + createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                    createShareConsumer.close();
                    for (int i3 = 0; count3 + i2 < 1500 && i3 < 10; i3++) {
                        i2 += createShareConsumer2.poll(Duration.ofMillis(5000L)).count();
                    }
                    createShareConsumer2.close();
                    Assertions.assertEquals(1500, count3 + i2);
                    if (createShareConsumer2 != null) {
                        createShareConsumer2.close();
                    }
                    if (createShareConsumer != null) {
                        createShareConsumer.close();
                    }
                    if (createProducer != null) {
                        createProducer.close();
                    }
                } catch (Throwable th) {
                    if (createShareConsumer2 != null) {
                        try {
                            createShareConsumer2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testMultipleConsumersInGroupFailureConcurrentConsumption(String str) throws InterruptedException, ExecutionException, TimeoutException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int i = 4;
        int i2 = 5000;
        String str2 = "group1";
        alterShareAutoOffsetReset("group1", "earliest");
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < 4; i3++) {
            arrayList.add(CompletableFuture.runAsync(() -> {
                produceMessages(i2);
            }));
        }
        int i4 = 1000000;
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return Integer.valueOf(consumeMessages(new AtomicInteger(0), i * i2, str2, 0, 1, false));
        });
        Assertions.assertDoesNotThrow(() -> {
            return (Integer) supplyAsync.get(15L, TimeUnit.SECONDS);
        }, "Exception awaiting consumeMessages");
        ArrayList arrayList2 = new ArrayList();
        for (int i5 = 0; i5 < 4; i5++) {
            int i6 = i5 + 1;
            arrayList2.add(CompletableFuture.supplyAsync(() -> {
                return Integer.valueOf(consumeMessages(atomicInteger, i * i2, str2, i6, 40, true, i4));
            }));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i7 -> {
            return new CompletableFuture[i7];
        })).get(60L, TimeUnit.SECONDS);
        CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(i8 -> {
            return new CompletableFuture[i8];
        })).get(60L, TimeUnit.SECONDS);
        Assertions.assertEquals(4 * 5000, arrayList2.stream().mapToInt((v0) -> {
            return v0.join();
        }).sum());
    }

    @Flaky("KAFKA-18025")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcquisitionLockTimeoutOnConsumer(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key_1".getBytes(), "value_1".getBytes());
                ProducerRecord producerRecord2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key_2".getBytes(), "value_2".getBytes());
                createShareConsumer.subscribe(Set.of(this.tp.topic()));
                createProducer.send(producerRecord);
                createProducer.flush();
                ConsumerRecords poll = createShareConsumer.poll(Duration.ofMillis(5000L));
                ConsumerRecord consumerRecord = (ConsumerRecord) poll.records(this.tp).get(0);
                Assertions.assertEquals("key_1", new String((byte[]) consumerRecord.key()));
                Assertions.assertEquals("value_1", new String((byte[]) consumerRecord.value()));
                Assertions.assertEquals(1, poll.count());
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(1000L)).count());
                createProducer.send(producerRecord2);
                createProducer.flush();
                ConsumerRecords poll2 = createShareConsumer.poll(Duration.ofMillis(5000L));
                ConsumerRecord consumerRecord2 = (ConsumerRecord) poll2.records(this.tp).get(0);
                Assertions.assertEquals("key_2", new String((byte[]) consumerRecord2.key()));
                Assertions.assertEquals("value_2", new String((byte[]) consumerRecord2.value()));
                Assertions.assertEquals(1, poll2.count());
                Thread.sleep(20000L);
                ConsumerRecords poll3 = createShareConsumer.poll(Duration.ofMillis(5000L));
                ConsumerRecord consumerRecord3 = (ConsumerRecord) poll3.records(this.tp).get(0);
                Assertions.assertEquals("key_2", new String((byte[]) consumerRecord3.key()));
                Assertions.assertEquals("value_2", new String((byte[]) consumerRecord3.value()));
                Assertions.assertEquals(1, poll3.count());
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(1000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer(createShareConsumer));
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                createShareConsumer.poll(Duration.ofMillis(5000L));
                createShareConsumer.poll(Duration.ofMillis(500L));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup(createShareConsumer));
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "Failed to consume records for share consumer";
                });
                createShareConsumer.poll(Duration.ofMillis(2000L));
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                TestUtils.waitForCondition(() -> {
                    try {
                        createShareConsumer.poll(Duration.ofMillis(500L));
                    } catch (WakeupException e) {
                        atomicBoolean.set(true);
                    }
                    return atomicBoolean.get();
                }, 15000L, 100L, () -> {
                    return "Failed to receive expected exception";
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Flaky("KAFKA-18033")
    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testAcknowledgementCommitCallbackThrowsException(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackThrows());
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "Failed to consume records for share consumer";
                });
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                TestUtils.waitForCondition(() -> {
                    try {
                        createShareConsumer.poll(Duration.ofMillis(500L));
                    } catch (OutOfOrderSequenceException e) {
                        atomicBoolean.set(true);
                    }
                    return atomicBoolean.get();
                }, 15000L, 100L, () -> {
                    return "Failed to receive expected exception";
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testPollThrowsInterruptExceptionIfInterrupted(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            try {
                Thread.currentThread().interrupt();
                Assertions.assertThrows(InterruptException.class, () -> {
                    createShareConsumer.poll(Duration.ZERO);
                });
                Thread.interrupted();
                Assertions.assertDoesNotThrow(() -> {
                    return createShareConsumer.poll(Duration.ZERO);
                }, "Failed to consume records");
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
            } catch (Throwable th) {
                Thread.interrupted();
                throw th;
            }
        } catch (Throwable th2) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            createShareConsumer.subscribe(Collections.singleton("topic abc"));
            Assertions.assertThrows(InvalidTopicException.class, () -> {
                createShareConsumer.poll(Duration.ofMillis(10000L));
            });
            if (createShareConsumer != null) {
                createShareConsumer.close();
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testWakeupWithFetchedRecordsAvailable(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes()));
                createProducer.flush();
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                createShareConsumer.wakeup();
                Assertions.assertThrows(WakeupException.class, () -> {
                    createShareConsumer.poll(Duration.ZERO);
                });
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscriptionFollowedByTopicCreation(String str) throws InterruptedException {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                createShareConsumer.subscribe(Collections.singleton("foo"));
                createTopic("foo");
                ProducerRecord producerRecord = new ProducerRecord("foo", 0, (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "Failed to consume records for share consumer, metadata sync failed";
                });
                createProducer.send(producerRecord);
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createProducer.send(producerRecord);
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testSubscriptionAndPollFollowedByTopicDeletion(String str) throws InterruptedException, ExecutionException {
        createTopic("bar");
        createTopic("baz");
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
            try {
                ProducerRecord producerRecord = new ProducerRecord("bar", 0, (Long) null, "key".getBytes(), "value".getBytes());
                ProducerRecord producerRecord2 = new ProducerRecord("baz", 0, (Long) null, "key".getBytes(), "value".getBytes());
                createShareConsumer.subscribe(Arrays.asList("bar", "baz"));
                createProducer.send(producerRecord).get();
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "incorrect number of records";
                });
                createProducer.send(producerRecord2).get();
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "incorrect number of records";
                });
                deleteTopic("bar");
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(500L)).count());
                createProducer.send(producerRecord2).get();
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "incorrect number of records";
                });
                createProducer.send(producerRecord2).get();
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(2000L)).count() == 1;
                }, 15000L, 100L, () -> {
                    return "incorrect number of records";
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testLsoMovementByRecordsDeletion(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), 0, (Long) null, "key".getBytes(), "value".getBytes());
            for (int i = 0; i < 10; i++) {
                Assertions.assertDoesNotThrow(() -> {
                    return (RecordMetadata) createProducer.send(producerRecord).get();
                }, "Failed to send records");
            }
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset(5L)));
            Assertions.assertEquals(5, consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true));
            for (int i2 = 0; i2 < 5; i2++) {
                Assertions.assertDoesNotThrow(() -> {
                    return (RecordMetadata) createProducer.send(producerRecord).get();
                }, "Failed to send records");
            }
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset(14L)));
            Assertions.assertEquals(1, consumeMessages(new AtomicInteger(0), 1, "group1", 1, 10, true));
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset(15L)));
            Assertions.assertEquals(0, consumeMessages(new AtomicInteger(0), 0, "group1", 1, 5, true));
            if (createProducer != null) {
                createProducer.close();
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testShareAutoOffsetResetDefaultValue(String str) {
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
            try {
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                Assertions.assertEquals(0, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createProducer.send(producerRecord);
                createProducer.flush();
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createProducer != null) {
                    createProducer.close();
                }
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testShareAutoOffsetResetEarliest(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
            try {
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                createProducer.send(producerRecord);
                createProducer.flush();
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                createProducer.send(producerRecord);
                createProducer.flush();
                Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                if (createProducer != null) {
                    createProducer.close();
                }
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testShareAutoOffsetResetEarliestAfterLsoMovement(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
            try {
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                for (int i = 0; i < 10; i++) {
                    Assertions.assertDoesNotThrow(() -> {
                        return (RecordMetadata) createProducer.send(producerRecord).get();
                    }, "Failed to send records");
                }
                this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset(5L)));
                Assertions.assertEquals(5, consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true));
                if (createProducer != null) {
                    createProducer.close();
                }
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
    @ParameterizedTest(name = "{displayName}.persister={0}")
    public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String str) {
        alterShareAutoOffsetReset("group1", "earliest");
        alterShareAutoOffsetReset("group2", "latest");
        KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
        try {
            KafkaShareConsumer createShareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
            try {
                KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
                try {
                    createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                    createShareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
                    ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
                    createProducer.send(producerRecord);
                    createProducer.flush();
                    Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                    Assertions.assertEquals(0, createShareConsumer2.poll(Duration.ofMillis(5000L)).count());
                    createProducer.send(producerRecord);
                    Assertions.assertEquals(1, createShareConsumer.poll(Duration.ofMillis(5000L)).count());
                    Assertions.assertEquals(1, createShareConsumer2.poll(Duration.ofMillis(5000L)).count());
                    if (createProducer != null) {
                        createProducer.close();
                    }
                    if (createShareConsumer2 != null) {
                        createShareConsumer2.close();
                    }
                    if (createShareConsumer != null) {
                        createShareConsumer.close();
                    }
                } catch (Throwable th) {
                    if (createProducer != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createShareConsumer2 != null) {
                    try {
                        createShareConsumer2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createShareConsumer != null) {
                try {
                    createShareConsumer.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    private int produceMessages(int i) {
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            ProducerRecord producerRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
            IntStream.range(0, i).forEach(i2 -> {
                createProducer.send(producerRecord);
            });
            createProducer.flush();
            if (createProducer != null) {
                createProducer.close();
            }
            return i;
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void produceMessagesWithTimestamp(int i, long j) {
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        for (int i2 = 0; i2 < i; i2++) {
            try {
                createProducer.send(new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), Long.valueOf(j + i2), ("key " + i2).getBytes(), ("value " + i2).getBytes()));
            } catch (Throwable th) {
                if (createProducer != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        createProducer.flush();
        if (createProducer != null) {
            createProducer.close();
        }
    }

    private int consumeMessages(AtomicInteger atomicInteger, int i, String str, int i2, int i3, boolean z) {
        return ((Integer) Assertions.assertDoesNotThrow(() -> {
            KafkaShareConsumer<byte[], byte[]> createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), str);
            try {
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Integer valueOf = Integer.valueOf(consumeMessages(createShareConsumer, atomicInteger, i, i2, i3, z));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                return valueOf;
            } catch (Throwable th) {
                if (createShareConsumer != null) {
                    try {
                        createShareConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, "Consumer " + i2 + " failed with exception")).intValue();
    }

    private int consumeMessages(AtomicInteger atomicInteger, int i, String str, int i2, int i3, boolean z, int i4) {
        return ((Integer) Assertions.assertDoesNotThrow(() -> {
            KafkaShareConsumer<byte[], byte[]> createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), str, Map.of("max.partition.fetch.bytes", Integer.valueOf(i4)));
            try {
                createShareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Integer valueOf = Integer.valueOf(consumeMessages(createShareConsumer, atomicInteger, i, i2, i3, z));
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                return valueOf;
            } catch (Throwable th) {
                if (createShareConsumer != null) {
                    try {
                        createShareConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, "Consumer " + i2 + " failed with exception")).intValue();
    }

    private int consumeMessages(KafkaShareConsumer<byte[], byte[]> kafkaShareConsumer, AtomicInteger atomicInteger, int i, int i2, int i3, boolean z) {
        return ((Integer) Assertions.assertDoesNotThrow(() -> {
            int i4 = 0;
            int i5 = 0;
            if (i > 0) {
                while (atomicInteger.get() < i && i5 < i3) {
                    ConsumerRecords poll = kafkaShareConsumer.poll(Duration.ofMillis(2000L));
                    i4 += poll.count();
                    atomicInteger.addAndGet(poll.count());
                    i5++;
                }
            } else {
                while (i5 < i3) {
                    ConsumerRecords poll2 = kafkaShareConsumer.poll(Duration.ofMillis(2000L));
                    i4 += poll2.count();
                    atomicInteger.addAndGet(poll2.count());
                    i5++;
                }
            }
            if (z) {
                kafkaShareConsumer.commitSync(Duration.ofMillis(10000L));
            }
            return Integer.valueOf(i4);
        }, "Consumer " + i2 + " failed with exception")).intValue();
    }

    private <K, V> List<ConsumerRecord<K, V>> consumeRecords(KafkaShareConsumer<K, V> kafkaShareConsumer, int i) {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        while (arrayList.size() < i) {
            ConsumerRecords poll = kafkaShareConsumer.poll(Duration.ofMillis(100L));
            Objects.requireNonNull(arrayList);
            poll.forEach((v1) -> {
                r1.add(v1);
            });
            Assertions.assertFalse(System.currentTimeMillis() - currentTimeMillis > 60000, "Timed out before consuming expected records.");
        }
        return arrayList;
    }

    private void createTopic(String str) {
        Properties clientProperties = this.cluster.clientProperties();
        Assertions.assertDoesNotThrow(() -> {
            Admin create = Admin.create(clientProperties);
            try {
                create.createTopics(Collections.singleton(new NewTopic(str, 1, (short) 1))).all().get();
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, "Failed to create topic");
    }

    private void deleteTopic(String str) {
        Properties clientProperties = this.cluster.clientProperties();
        Assertions.assertDoesNotThrow(() -> {
            Admin create = Admin.create(clientProperties);
            try {
                create.deleteTopics(Collections.singleton(str)).all().get();
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, "Failed to delete topic");
    }

    private Admin createAdminClient() {
        return Admin.create(this.cluster.clientProperties());
    }

    private <K, V> KafkaProducer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2) {
        return new KafkaProducer<>(this.cluster.clientProperties(), serializer, serializer2);
    }

    private <K, V> KafkaProducer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2, String str) {
        Properties clientProperties = this.cluster.clientProperties();
        clientProperties.put("transactional.id", str);
        return new KafkaProducer<>(clientProperties, serializer, serializer2);
    }

    private <K, V> KafkaShareConsumer<K, V> createShareConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, String str) {
        Properties clientProperties = this.cluster.clientProperties();
        clientProperties.put("group.id", str);
        return new KafkaShareConsumer<>(clientProperties, deserializer, deserializer2);
    }

    private <K, V> KafkaShareConsumer<K, V> createShareConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, String str, Map<?, ?> map) {
        Properties clientProperties = this.cluster.clientProperties();
        clientProperties.put("group.id", str);
        clientProperties.putAll(map);
        return new KafkaShareConsumer<>(clientProperties, deserializer, deserializer2);
    }

    private void warmup() throws InterruptedException {
        createTopic(this.warmupTp.topic());
        TestUtils.waitForCondition(() -> {
            return !((BrokerServer) this.cluster.brokers().get(0)).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty();
        }, 15000L, 100L, () -> {
            return "cache not up yet";
        });
        ProducerRecord producerRecord = new ProducerRecord(this.warmupTp.topic(), Integer.valueOf(this.warmupTp.partition()), (Long) null, "key".getBytes(), "value".getBytes());
        Set singleton = Collections.singleton(this.warmupTp.topic());
        alterShareAutoOffsetReset("warmupgroup1", "earliest");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
        try {
            KafkaShareConsumer createShareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "warmupgroup1");
            try {
                createProducer.send(producerRecord);
                createProducer.flush();
                createShareConsumer.subscribe(singleton);
                TestUtils.waitForCondition(() -> {
                    return createShareConsumer.poll(Duration.ofMillis(5000L)).count() == 1;
                }, 30000L, 200L, () -> {
                    return "warmup record not received";
                });
                if (createShareConsumer != null) {
                    createShareConsumer.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void alterShareAutoOffsetReset(String str, String str2) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, str);
        HashMap hashMap = new HashMap();
        hashMap.put(configResource, List.of(new AlterConfigOp(new ConfigEntry("share.auto.offset.reset", str2), AlterConfigOp.OpType.SET)));
        AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions();
        Assertions.assertDoesNotThrow(() -> {
            return (Void) this.adminClient.incrementalAlterConfigs(hashMap, alterConfigsOptions).all().get(60L, TimeUnit.SECONDS);
        }, "Failed to alter configs");
    }
}
