package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.class */
public class InternalTopicManagerTest {
    private String threadName;
    private MockAdminClient mockAdminClient;
    private InternalTopicManager internalTopicManager;
    private final Node broker1 = new Node(0, "dummyHost-1", 1234);
    private final Node broker2 = new Node(1, "dummyHost-2", 1234);
    private final List<Node> cluster = new ArrayList<Node>(2) { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.1
        {
            add(InternalTopicManagerTest.this.broker1);
            add(InternalTopicManagerTest.this.broker2);
        }
    };
    private final String topic = "test_topic";
    private final String topic2 = "test_topic_2";
    private final String topic3 = "test_topic_3";
    private final List<Node> singleReplica = Collections.singletonList(this.broker1);
    private final Map<String, Object> config = new HashMap<String, Object>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.2
        {
            put("application.id", "app-id");
            put("bootstrap.servers", InternalTopicManagerTest.this.broker1.host() + ":" + InternalTopicManagerTest.this.broker1.port());
            put("replication.factor", 1);
            put(StreamsConfig.producerPrefix("batch.size"), 16384);
            put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), 100);
            put("retry.backoff.ms", 50);
        }
    };

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$MockCreateTopicsResult.class */
    private static class MockCreateTopicsResult extends CreateTopicsResult {
        MockCreateTopicsResult(Map<String, KafkaFuture<CreateTopicsResult.TopicMetadataAndConfig>> map) {
            super(map);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$MockDescribeTopicsResult.class */
    private static class MockDescribeTopicsResult extends DescribeTopicsResult {
        MockDescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> map) {
            super(map);
        }
    }

    @Before
    public void init() {
        this.threadName = Thread.currentThread().getName();
        this.mockAdminClient = new MockAdminClient(this.cluster, this.broker1);
        this.internalTopicManager = new InternalTopicManager(Time.SYSTEM, this.mockAdminClient, new StreamsConfig(this.config));
    }

    @After
    public void shutdown() {
        this.mockAdminClient.close();
    }

    @Test
    public void shouldReturnCorrectPartitionCounts() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.singleReplica, Collections.emptyList())), (Map) null);
        Assert.assertEquals(Collections.singletonMap("test_topic", 1), this.internalTopicManager.getNumPartitions(Collections.singleton("test_topic"), Collections.emptySet()));
    }

    @Test
    public void shouldCreateRequiredTopics() throws Exception {
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        UnwindowedChangelogTopicConfig unwindowedChangelogTopicConfig = new UnwindowedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        unwindowedChangelogTopicConfig.setNumberOfPartitions(1);
        WindowedChangelogTopicConfig windowedChangelogTopicConfig = new WindowedChangelogTopicConfig("test_topic_3", Collections.emptyMap());
        windowedChangelogTopicConfig.setNumberOfPartitions(1);
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_2", unwindowedChangelogTopicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_3", windowedChangelogTopicConfig));
        Assert.assertEquals(Utils.mkSet(new String[]{"test_topic", "test_topic_2", "test_topic_3"}), this.mockAdminClient.listTopics().names().get());
        Assert.assertEquals(new TopicDescription("test_topic", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.3
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic")).values().get("test_topic")).get());
        Assert.assertEquals(new TopicDescription("test_topic_2", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.4
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic_2")).values().get("test_topic_2")).get());
        Assert.assertEquals(new TopicDescription("test_topic_3", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.5
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic_3")).values().get("test_topic_3")).get());
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        ConfigResource configResource3 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_3");
        Assert.assertEquals(new ConfigEntry("cleanup.policy", "delete"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get()).get("cleanup.policy"));
        Assert.assertEquals(new ConfigEntry("cleanup.policy", "compact"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource2)).values().get(configResource2)).get()).get("cleanup.policy"));
        Assert.assertEquals(new ConfigEntry("cleanup.policy", "compact,delete"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource3)).values().get(configResource3)).get()).get("cleanup.policy"));
    }

    @Test
    public void shouldCompleteTopicValidationOnRetry() {
        AdminClient adminClient = (AdminClient) EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(Time.SYSTEM, adminClient, new StreamsConfig(this.config));
        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new TopicDescription("test_topic", false, Collections.singletonList(topicPartitionInfo), Collections.emptySet()));
        kafkaFutureImpl2.completeExceptionally(new UnknownTopicOrPartitionException("KABOOM!"));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.completeExceptionally(new TopicExistsException("KABOOM!"));
        EasyMock.expect(adminClient.describeTopics(Utils.mkSet(new String[]{"test_topic", "test_topic_2"}))).andReturn(new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl), Utils.mkEntry("test_topic_2", kafkaFutureImpl2)}))).once();
        EasyMock.expect(adminClient.createTopics(Collections.singleton(new NewTopic("test_topic_2", Optional.of(1), Optional.of((short) 1)).configs(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "compact"), Utils.mkEntry("message.timestamp.type", "CreateTime")}))))).andReturn(new MockCreateTopicsResult(Collections.singletonMap("test_topic_2", kafkaFutureImpl3))).once();
        EasyMock.expect(adminClient.describeTopics(Collections.singleton("test_topic_2"))).andReturn(new MockDescribeTopicsResult(Collections.singletonMap("test_topic_2", kafkaFutureImpl)));
        EasyMock.replay(new Object[]{adminClient});
        UnwindowedChangelogTopicConfig unwindowedChangelogTopicConfig = new UnwindowedChangelogTopicConfig("test_topic", Collections.emptyMap());
        unwindowedChangelogTopicConfig.setNumberOfPartitions(1);
        UnwindowedChangelogTopicConfig unwindowedChangelogTopicConfig2 = new UnwindowedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        unwindowedChangelogTopicConfig2.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", unwindowedChangelogTopicConfig), Utils.mkEntry("test_topic_2", unwindowedChangelogTopicConfig2)}));
        EasyMock.verify(new Object[]{adminClient});
    }

    @Test
    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
        this.mockAdminClient.addTopic(false, "test_topic", new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.6
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
                add(new TopicPartitionInfo(1, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }, (Map) null);
        try {
            RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
            repartitionTopicConfig.setNumberOfPartitions(1);
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), (Map) null);
        InternalTopicManager internalTopicManager = new InternalTopicManager(Time.SYSTEM, this.mockAdminClient, new StreamsConfig(this.config));
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
    }

    @Test
    public void shouldNotThrowExceptionForEmptyTopicMap() {
        this.internalTopicManager.makeReady(Collections.emptyMap());
    }

    @Test
    public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
        this.mockAdminClient.timeoutNextRequest(1);
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        try {
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
            Assert.fail("Should have thrown StreamsException.");
        } catch (StreamsException e) {
            Assert.assertEquals(TimeoutException.class, e.getCause().getClass());
        }
    }

    @Test
    public void shouldLogWhenTopicNotFoundAndNotThrowException() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), (Map) null);
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        RepartitionTopicConfig repartitionTopicConfig2 = new RepartitionTopicConfig("internal-topic", Collections.emptyMap());
        repartitionTopicConfig2.setNumberOfPartitions(1);
        HashMap hashMap = new HashMap();
        hashMap.put("test_topic", repartitionTopicConfig);
        hashMap.put("internal-topic", repartitionTopicConfig2);
        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(InternalTopicManager.class);
        Throwable th = null;
        try {
            try {
                this.internalTopicManager.makeReady(hashMap);
                MatcherAssert.assertThat(createAndRegister.getMessages(), Matchers.hasItem("stream-thread [" + this.threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\nError message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found."));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
        AdminClient adminClient = (AdminClient) EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(Time.SYSTEM, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
        EasyMock.expect(adminClient.describeTopics(Collections.singleton("test_topic"))).andReturn(new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl))).once();
        EasyMock.expect(adminClient.createTopics(Collections.emptySet())).andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
        EasyMock.expect(adminClient.describeTopics(Collections.singleton("test_topic"))).andReturn(new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl2))).once();
        EasyMock.expect(adminClient.createTopics(Collections.singleton(new NewTopic("test_topic", Optional.of(1), Optional.of((short) 1)).configs(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "delete"), Utils.mkEntry("message.timestamp.type", "CreateTime"), Utils.mkEntry("segment.bytes", "52428800"), Utils.mkEntry("retention.ms", "-1")}))))).andReturn(new MockCreateTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl3))).once();
        EasyMock.replay(new Object[]{adminClient});
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        EasyMock.verify(new Object[]{adminClient});
    }

    @Test
    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
        AdminClient adminClient = (AdminClient) EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(Time.SYSTEM, adminClient, new StreamsConfig(this.config));
        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new TopicDescription("test_topic", false, Collections.singletonList(topicPartitionInfo), Collections.emptySet()));
        EasyMock.expect(adminClient.describeTopics(Collections.singleton("test_topic"))).andReturn(new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl))).once();
        EasyMock.expect(adminClient.createTopics(Collections.emptySet())).andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
        EasyMock.expect(adminClient.describeTopics(Collections.singleton("test_topic"))).andReturn(new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl2))).once();
        EasyMock.replay(new Object[]{adminClient});
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        EasyMock.verify(new Object[]{adminClient});
    }

    @Test
    public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
        AdminClient adminClient = (AdminClient) EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(Time.SYSTEM, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        EasyMock.expect(adminClient.describeTopics(Collections.singleton("test_topic"))).andReturn(new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl))).anyTimes();
        EasyMock.expect(adminClient.createTopics(Collections.emptySet())).andReturn(new MockCreateTopicsResult(Collections.emptyMap())).anyTimes();
        EasyMock.replay(new Object[]{adminClient});
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        TimeoutException assertThrows = Assert.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        });
        Assert.assertNull(assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
        EasyMock.verify(new Object[]{adminClient});
    }

    @Test
    public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), (Map) null);
        this.mockAdminClient.markTopicForDeletion("test_topic");
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        TimeoutException assertThrows = Assert.assertThrows(TimeoutException.class, () -> {
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        });
        Assert.assertNull(assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
    }
}
