package kafka.server.integration;

import java.io.File;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.remote.RemoteLogReaderTest;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.utils.Logging;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;

/* loaded from: input_file:kafka/server/integration/EligibleLeaderReplicasIntegrationTest.class */
public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging {
    private String bootstrapServer;
    private String testTopicName;
    private Admin adminClient;

    @Override // kafka.server.QuorumTestHarness
    public MetadataVersion metadataVersion() {
        return MetadataVersion.IBP_4_0_IV1;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo33generateConfigs() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(5, true, true, Option.empty(), Option.empty(), Option.empty(), true, false, false, false, new HashMap(), 1, false, 1, (short) 4, 0, false)));
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(KafkaConfig.fromProps((Properties) it.next()));
        }
        return JavaConverters.asScalaBuffer(arrayList2).toSeq();
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        Properties properties = new Properties();
        this.bootstrapServer = bootstrapServers(listenerName());
        properties.put("bootstrap.servers", this.bootstrapServer);
        this.adminClient = Admin.create(properties);
        this.adminClient.updateFeatures(Map.of("eligible.leader.replicas.version", new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), new UpdateFeaturesOptions());
        this.testTopicName = String.format("%s-%s", ((Method) testInfo.getTestMethod().get()).getName(), "ELR-test");
    }

    @AfterEach
    public void close() throws Exception {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest
    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String str) throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(this.testTopicName, 1, (short) 4))).all().get();
        TestUtils.waitForPartitionMetadata(brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        this.adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
        Producer producer = null;
        Consumer consumer = null;
        try {
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            List replicas = topicPartitionInfo.replicas();
            Assertions.assertEquals(4, topicPartitionInfo.isr().size());
            Assertions.assertEquals(0, topicPartitionInfo.elr().size());
            Assertions.assertEquals(0, topicPartitionInfo.lastKnownElr().size());
            Properties properties = new Properties();
            properties.putIfAbsent("key.serializer", StringSerializer.class.getName());
            properties.putIfAbsent("value.serializer", StringSerializer.class.getName());
            properties.put("bootstrap.servers", this.bootstrapServer);
            properties.put("acks", "1");
            producer = new KafkaProducer(properties);
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", this.bootstrapServer);
            properties2.put("group.id", RemoteLogReaderTest.TOPIC);
            properties2.put("fetch.max.wait.ms", "10");
            properties2.put("auto.offset.reset", "earliest");
            properties2.putIfAbsent("key.deserializer", StringDeserializer.class.getName());
            properties2.putIfAbsent("value.deserializer", StringDeserializer.class.getName());
            consumer = new KafkaConsumer(properties2);
            consumer.subscribe(Collections.singleton(this.testTopicName));
            producer.send(new ProducerRecord(this.testTopicName, "0", "0")).get();
            waitUntilOneMessageIsConsumed(consumer);
            killBroker(((Node) replicas.get(0)).id());
            killBroker(((Node) replicas.get(1)).id());
            waitForIsrAndElr((num, num2) -> {
                return Boolean.valueOf(num.intValue() == 2 && num2.intValue() == 1);
            });
            producer.send(new ProducerRecord(this.testTopicName, "1", "1")).get();
            Thread.sleep(100L);
            Assertions.assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count());
            startBroker(((Node) replicas.get(1)).id());
            startBroker(((Node) replicas.get(0)).id());
            waitForIsrAndElr((num3, num4) -> {
                return Boolean.valueOf(num3.intValue() == 4 && num4.intValue() == 0);
            });
            waitUntilOneMessageIsConsumed(consumer);
            restartDeadBrokers(false);
            if (consumer != null) {
                consumer.close();
            }
            if (producer != null) {
                producer.close();
            }
        } catch (Throwable th) {
            restartDeadBrokers(false);
            if (consumer != null) {
                consumer.close();
            }
            if (producer != null) {
                producer.close();
            }
            throw th;
        }
    }

    void waitUntilOneMessageIsConsumed(Consumer consumer) {
        TestUtils.waitUntilTrue(() -> {
            try {
                return Boolean.valueOf(consumer.poll(Duration.ofMillis(100L)).count() >= 1);
            } catch (Exception e) {
                return false;
            }
        }, () -> {
            return "fail to consume messages";
        }, 15000L, 100L);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest
    public void testElrMemberCanBeElected(String str) throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(this.testTopicName, 1, (short) 4))).all().get();
        TestUtils.waitForPartitionMetadata(brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        this.adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
        try {
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            List replicas = topicPartitionInfo.replicas();
            Assertions.assertEquals(4, topicPartitionInfo.isr().size());
            Assertions.assertEquals(0, topicPartitionInfo.elr().size());
            Assertions.assertEquals(0, topicPartitionInfo.lastKnownElr().size());
            killBroker(((Node) replicas.get(0)).id());
            killBroker(((Node) replicas.get(1)).id());
            killBroker(((Node) replicas.get(2)).id());
            waitForIsrAndElr((num, num2) -> {
                return Boolean.valueOf(num.intValue() == 1 && num2.intValue() == 2);
            });
            killBroker(((Node) replicas.get(3)).id());
            waitForIsrAndElr((num3, num4) -> {
                return Boolean.valueOf(num3.intValue() == 0 && num4.intValue() == 3);
            });
            TopicPartitionInfo topicPartitionInfo2 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertEquals(1, topicPartitionInfo2.lastKnownElr().size(), topicPartitionInfo2.toString());
            int id = ((Node) replicas.get(3)).id();
            Assertions.assertEquals(id, ((Node) topicPartitionInfo2.lastKnownElr().get(0)).id(), topicPartitionInfo2.toString());
            int id2 = ((Node) ((List) topicPartitionInfo2.elr().stream().filter(node -> {
                return node.id() != id;
            }).collect(Collectors.toList())).get(0)).id();
            startBroker(id2);
            waitForIsrAndElr((num5, num6) -> {
                return Boolean.valueOf(num5.intValue() == 1 && num6.intValue() == 2);
            });
            TopicPartitionInfo topicPartitionInfo3 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertEquals(0, topicPartitionInfo3.lastKnownElr().size(), topicPartitionInfo3.toString());
            Assertions.assertEquals(id2, topicPartitionInfo3.leader().id(), topicPartitionInfo3.toString());
            topicPartitionInfo3.replicas().stream().filter(node2 -> {
                return node2.id() != id2;
            }).limit(2L).forEach(node3 -> {
                startBroker(node3.id());
            });
            waitForIsrAndElr((num7, num8) -> {
                return Boolean.valueOf(num7.intValue() == 3 && num8.intValue() == 0);
            });
            TopicPartitionInfo topicPartitionInfo4 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertEquals(0, topicPartitionInfo4.lastKnownElr().size(), topicPartitionInfo4.toString());
            Assertions.assertEquals(id2, topicPartitionInfo4.leader().id(), topicPartitionInfo4.toString());
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest
    public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String str) throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(this.testTopicName, 1, (short) 4))).all().get();
        TestUtils.waitForPartitionMetadata(brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        this.adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
        try {
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            List replicas = topicPartitionInfo.replicas();
            Assertions.assertEquals(4, topicPartitionInfo.isr().size());
            Assertions.assertEquals(0, topicPartitionInfo.elr().size());
            Assertions.assertEquals(0, topicPartitionInfo.lastKnownElr().size());
            killBroker(((Node) replicas.get(0)).id());
            killBroker(((Node) replicas.get(1)).id());
            killBroker(((Node) replicas.get(2)).id());
            killBroker(((Node) replicas.get(3)).id());
            waitForIsrAndElr((num, num2) -> {
                return Boolean.valueOf(num.intValue() == 0 && num2.intValue() == 3);
            });
            int id = ((Node) ((TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0)).elr().get(0)).id();
            Seq liveLogDirs = ((KafkaBroker) brokers().find(kafkaBroker -> {
                return Boolean.valueOf(kafkaBroker.config().brokerId() == id);
            }).get()).logManager().liveLogDirs();
            Assertions.assertEquals(1, liveLogDirs.size());
            CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(((File) liveLogDirs.apply(0)).toString());
            Assertions.assertTrue(cleanShutdownFileHandler.exists());
            Assertions.assertDoesNotThrow(() -> {
                cleanShutdownFileHandler.delete();
            });
            startBroker(id);
            waitForIsrAndElr((num3, num4) -> {
                return Boolean.valueOf(num3.intValue() == 0 && num4.intValue() == 2);
            });
            TopicPartitionInfo topicPartitionInfo2 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertTrue(topicPartitionInfo2.leader() == null);
            Assertions.assertEquals(1, topicPartitionInfo2.lastKnownElr().size());
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest
    public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String str) throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(this.testTopicName, 1, (short) 4))).all().get();
        TestUtils.waitForPartitionMetadata(brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        this.adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
        try {
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            List replicas = topicPartitionInfo.replicas();
            Assertions.assertEquals(4, topicPartitionInfo.isr().size());
            Assertions.assertEquals(0, topicPartitionInfo.elr().size());
            Assertions.assertEquals(0, topicPartitionInfo.lastKnownElr().size());
            killBroker(((Node) replicas.get(0)).id());
            killBroker(((Node) replicas.get(1)).id());
            killBroker(((Node) replicas.get(2)).id());
            killBroker(((Node) replicas.get(3)).id());
            waitForIsrAndElr((num, num2) -> {
                return Boolean.valueOf(num.intValue() == 0 && num2.intValue() == 3);
            });
            TopicPartitionInfo topicPartitionInfo2 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            int id = ((Node) topicPartitionInfo2.lastKnownElr().get(0)).id();
            Set set = (Set) replicas.stream().map(node -> {
                return Integer.valueOf(node.id());
            }).collect(Collectors.toSet());
            brokers().foreach(kafkaBroker -> {
                if (set.contains(Integer.valueOf(kafkaBroker.config().brokerId()))) {
                    Seq liveLogDirs = kafkaBroker.logManager().liveLogDirs();
                    Assertions.assertEquals(1, liveLogDirs.size());
                    CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(((File) liveLogDirs.apply(0)).toString());
                    Assertions.assertDoesNotThrow(() -> {
                        cleanShutdownFileHandler.delete();
                    });
                }
                return true;
            });
            topicPartitionInfo2.replicas().stream().forEach(node2 -> {
                if (node2.id() != id) {
                    startBroker(node2.id());
                }
            });
            waitForIsrAndElr((num3, num4) -> {
                return Boolean.valueOf(num3.intValue() == 0 && num4.intValue() == 1);
            });
            TopicPartitionInfo topicPartitionInfo3 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertTrue(topicPartitionInfo3.leader() == null);
            Assertions.assertEquals(1, topicPartitionInfo3.lastKnownElr().size());
            startBroker(id);
            waitForIsrAndElr((num5, num6) -> {
                return Boolean.valueOf(num5.intValue() > 0 && num6.intValue() == 0);
            });
            TestUtils.waitUntilTrue(() -> {
                try {
                    TopicPartitionInfo topicPartitionInfo4 = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
                    if (topicPartitionInfo4.leader() == null) {
                        return false;
                    }
                    return Boolean.valueOf(topicPartitionInfo4.lastKnownElr().isEmpty() && topicPartitionInfo4.elr().isEmpty() && topicPartitionInfo4.leader().id() == id);
                } catch (Exception e) {
                    return false;
                }
            }, () -> {
                return String.format("Partition metadata for %s is not correct", this.testTopicName);
            }, 15000L, 100L);
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> biFunction) {
        TestUtils.waitUntilTrue(() -> {
            try {
                TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
                return biFunction.apply(Integer.valueOf(topicPartitionInfo.isr().size()), Integer.valueOf(topicPartitionInfo.elr().size()));
            } catch (Exception e) {
                return false;
            }
        }, () -> {
            return String.format("Partition metadata for %s is not propagated", this.testTopicName);
        }, 15000L, 100L);
    }
}
