package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.log.LogConfig;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/kafka/streams/integration/InternalTopicIntegrationTest.class */
public class InternalTopicIntegrationTest {

    @ClassRule
    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
    }

    private boolean isUsingCompactionForStateChangelogTopics() {
        boolean z = true;
        ZkClient zkClient = new ZkClient(CLUSTER.zKConnectString(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
        Iterator it = AdminUtils.fetchAllTopicConfigs(new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), false)).iterator();
        while (it.hasNext()) {
            Tuple2 tuple2 = (Tuple2) it.next();
            String str = (String) tuple2._1;
            Properties properties = (Properties) tuple2._2;
            if (str.endsWith("-changelog") && (!properties.containsKey(LogConfig.CleanupPolicyProp()) || !properties.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact()))) {
                z = false;
                break;
            }
        }
        zkClient.close();
        return z;
    }

    @Test
    public void shouldCompactTopicsForStateChangelogs() throws Exception {
        List asList = Arrays.asList("hello", "world", "world", "hello world");
        Serde String = Serdes.String();
        Serde Long = Serdes.Long();
        Properties properties = new Properties();
        properties.put("application.id", "compact-topics-integration-test");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("zookeeper.connect", CLUSTER.zKConnectString());
        properties.put("key.serde", Serdes.String().getClass().getName());
        properties.put("value.serde", Serdes.String().getClass().getName());
        properties.put("state.dir", "/tmp/kafka-streams");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{DEFAULT_INPUT_TOPIC}).flatMapValues(new ValueMapper<String, Iterable<String>>() { // from class: org.apache.kafka.streams.integration.InternalTopicIntegrationTest.2
            public Iterable<String> apply(String str) {
                return Arrays.asList(str.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.integration.InternalTopicIntegrationTest.1
            public KeyValue<String, String> apply(String str, String str2) {
                return new KeyValue<>(str2, str2);
            }
        }).countByKey("Counts").toStream().to(String, Long, DEFAULT_OUTPUT_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(properties);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.start();
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties2.put("acks", "all");
        properties2.put("retries", 0);
        properties2.put("key.serializer", StringSerializer.class);
        properties2.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, asList, properties2);
        kafkaStreams.close();
        Assert.assertEquals(Boolean.valueOf(isUsingCompactionForStateChangelogTopics()), true);
    }
}
