package kafka.clients.consumer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:kafka/clients/consumer/ConsumerIntegrationTest.class */
public class ConsumerIntegrationTest {
    @ClusterTests({@ClusterTest(serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false")}), @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic")})})
    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInstance) throws Exception {
        clusterInstance.createTopic("test-topic", 1, (short) 1);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(Map.of("bootstrap.servers", clusterInstance.bootstrapServers(), "group.id", "test-group", "key.deserializer", StringDeserializer.class.getName(), "value.deserializer", StringDeserializer.class.getName(), "group.protocol", GroupProtocol.CONSUMER.name()));
        try {
            kafkaConsumer.subscribe(Collections.singletonList("test-topic"));
            TestUtils.waitForCondition(() -> {
                try {
                    kafkaConsumer.poll(Duration.ofMillis(1000L));
                    return false;
                } catch (UnsupportedVersionException e) {
                    return Boolean.valueOf(e.getMessage().equals("The cluster does not support the new CONSUMER group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol until the cluster is upgraded."));
                }
            }, "Should get UnsupportedVersionException and how to revert to classic protocol");
            kafkaConsumer.close();
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(ClusterInstance clusterInstance) throws InterruptedException {
        testFetchPartitionsAfterFailedListener(clusterInstance, GroupProtocol.CLASSIC);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testFetchPartitionsAfterFailedListenerWithGroupProtocolConsumer(ClusterInstance clusterInstance) throws InterruptedException {
        testFetchPartitionsAfterFailedListener(clusterInstance, GroupProtocol.CONSUMER);
    }

    private static void testFetchPartitionsAfterFailedListener(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
        Producer producer = clusterInstance.producer(Map.of("key.serializer", ByteArraySerializer.class, "value.serializer", ByteArraySerializer.class));
        try {
            producer.send(new ProducerRecord("topic", "key".getBytes(), "value".getBytes()));
            if (producer != null) {
                producer.close();
            }
            Consumer consumer = clusterInstance.consumer(Map.of("group.protocol", groupProtocol.name()));
            try {
                consumer.subscribe(List.of("topic"), new ConsumerRebalanceListener() { // from class: kafka.clients.consumer.ConsumerIntegrationTest.1
                    private int count = 0;

                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        this.count++;
                        if (this.count == 1) {
                            throw new IllegalArgumentException("temporary error");
                        }
                    }
                });
                TestUtils.waitForCondition(() -> {
                    return Boolean.valueOf(consumer.poll(Duration.ofSeconds(1L)).count() == 1);
                }, 5000L, "failed to poll data");
                if (consumer != null) {
                    consumer.close();
                }
            } catch (Throwable th) {
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (producer != null) {
                try {
                    producer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolClassic(ClusterInstance clusterInstance) throws InterruptedException {
        testFetchPartitionsWithAlwaysFailedListener(clusterInstance, GroupProtocol.CLASSIC);
    }

    @ClusterTest(serverProperties = {@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")})
    public void testFetchPartitionsWithAlwaysFailedListenerWithGroupProtocolConsumer(ClusterInstance clusterInstance) throws InterruptedException {
        testFetchPartitionsWithAlwaysFailedListener(clusterInstance, GroupProtocol.CONSUMER);
    }

    private static void testFetchPartitionsWithAlwaysFailedListener(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
        Producer producer = clusterInstance.producer(Map.of("key.serializer", ByteArraySerializer.class, "value.serializer", ByteArraySerializer.class));
        try {
            producer.send(new ProducerRecord("topic", "key".getBytes(), "value".getBytes()));
            if (producer != null) {
                producer.close();
            }
            Consumer consumer = clusterInstance.consumer(Map.of("group.protocol", groupProtocol.name()));
            try {
                consumer.subscribe(List.of("topic"), new ConsumerRebalanceListener() { // from class: kafka.clients.consumer.ConsumerIntegrationTest.2
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        throw new IllegalArgumentException("always failed");
                    }
                });
                long currentTimeMillis = System.currentTimeMillis();
                long currentTimeMillis2 = System.currentTimeMillis();
                while (currentTimeMillis2 < currentTimeMillis + 3000) {
                    currentTimeMillis2 = System.currentTimeMillis();
                    try {
                        Assertions.assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count());
                    } catch (KafkaException e) {
                        Assertions.assertEquals("User rebalance callback throws an error", e.getMessage());
                    }
                    Thread.sleep(300L);
                }
                if (consumer != null) {
                    consumer.close();
                }
            } catch (Throwable th) {
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (producer != null) {
                try {
                    producer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
