package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.class */
public class StreamPartitionAssignorTest {
    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private Set<String> allTopics = Utils.mkSet(new String[]{"topic1", "topic2"});
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final TaskId task0 = new TaskId(0, 0);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private final String userEndPoint = "localhost:2171";
    private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final TopologyBuilder builder = new TopologyBuilder();
    private final StreamsConfig config = new StreamsConfig(configProps());

    private Properties configProps() {
        return new Properties() { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest.1
            {
                setProperty("application.id", "stream-partition-assignor-test");
                setProperty("bootstrap.servers", "localhost:2171");
                setProperty("buffered.records.per.partition", "3");
                setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        };
    }

    @Test
    public void testSubscription() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        final Set mkSet = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        final Set mkSet2 = Utils.mkSet(new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        UUID randomUUID = UUID.randomUUID();
        this.partitionAssignor.configure(this.config.getConsumerConfigs(new StreamThread(this.builder, this.config, new MockClientSupplier(), "test", "client-id", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L) { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest.2
            public Set<TaskId> prevTasks() {
                return mkSet;
            }

            public Set<TaskId> cachedTasks() {
                return mkSet2;
            }
        }, "test", "client-id"));
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet(new String[]{"topic1", "topic2"}));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Utils.mkList(new String[]{"topic1", "topic2"}), subscription.topics());
        HashSet hashSet = new HashSet(mkSet2);
        hashSet.removeAll(mkSet);
        Assert.assertEquals(new SubscriptionInfo(randomUUID, mkSet, hashSet, (String) null).encode(), subscription.userData());
    }

    @Test
    public void testAssignBasic() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List mkList = Utils.mkList(new String[]{"topic1", "topic2"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2});
        Set mkSet2 = Utils.mkSet(new TaskId[]{this.task0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{this.task2});
        Set mkSet5 = Utils.mkSet(new TaskId[]{this.task1});
        Set mkSet6 = Utils.mkSet(new TaskId[]{this.task2});
        Set mkSet7 = Utils.mkSet(new TaskId[]{this.task0});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet2, mkSet5, "localhost:2171").encode()));
        hashMap.put("consumer11", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet3, mkSet6, "localhost:2171").encode()));
        hashMap.put("consumer20", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID2, mkSet4, mkSet7, "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(this.metadata, hashMap);
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet(new TopicPartition[]{this.t1p1, this.t2p1})}), Utils.mkSet(new HashSet[]{new HashSet(((PartitionAssignor.Assignment) assign.get("consumer10")).partitions()), new HashSet(((PartitionAssignor.Assignment) assign.get("consumer11")).partitions())}));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((PartitionAssignor.Assignment) assign.get("consumer20")).partitions()));
        HashSet hashSet = new HashSet();
        hashSet.addAll(checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign.get("consumer10")).activeTasks);
        hashSet.addAll(checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign.get("consumer11")).activeTasks);
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task0, this.task1}), hashSet);
        hashSet.addAll(checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign.get("consumer20")).activeTasks);
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
    }

    @Test
    public void testAssignWithPartialTopology() throws Exception {
        Properties configProps = configProps();
        configProps.put("partition.grouper", SingleGroupPartitionGrouperStub.class);
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockStateStoreSupplier("store1", false), new String[]{"processor1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore(new MockStateStoreSupplier("store2", false), new String[]{"processor2"});
        List mkList = Utils.mkList(new String[]{"topic1", "topic2"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2});
        UUID randomUUID = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, streamsConfig, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(streamsConfig.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(this.metadata, hashMap);
        HashSet hashSet = new HashSet();
        hashSet.addAll(checkAssignment(Utils.mkSet(new String[]{"topic1"}), (PartitionAssignor.Assignment) assign.get("consumer10")).activeTasks);
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
    }

    @Test
    public void testAssignEmptyMetadata() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List mkList = Utils.mkList(new String[]{"topic1", "topic2"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2});
        Set mkSet2 = Utils.mkSet(new TaskId[]{this.task0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task1});
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        UUID randomUUID = UUID.randomUUID();
        this.partitionAssignor.configure(this.config.getConsumerConfigs(new StreamThread(this.builder, this.config, new MockClientSupplier(), "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L), "test", "client1"));
        HashMap hashMap = new HashMap();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet2, mkSet3, "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(cluster, hashMap);
        Assert.assertEquals(Collections.emptySet(), new HashSet(((PartitionAssignor.Assignment) assign.get("consumer10")).partitions()));
        HashSet hashSet = new HashSet();
        hashSet.addAll(checkAssignment(Collections.emptySet(), (PartitionAssignor.Assignment) assign.get("consumer10")).activeTasks);
        Assert.assertEquals(0L, hashSet.size());
        Assert.assertEquals(Collections.emptySet(), new HashSet(hashSet));
        Map assign2 = this.partitionAssignor.assign(this.metadata, hashMap);
        Assert.assertEquals(Utils.mkSet(new Set[]{Utils.mkSet(new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), Utils.mkSet(new HashSet[]{new HashSet(((PartitionAssignor.Assignment) assign2.get("consumer10")).partitions())}));
        hashSet.addAll(checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign2.get("consumer10")).activeTasks);
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, new HashSet(hashSet));
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
    }

    @Test
    public void testAssignWithNewTasks() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addSource("source3", new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List mkList = Utils.mkList(new String[]{"topic1", "topic2", "topic3"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2, this.task3});
        Set mkSet2 = Utils.mkSet(new TaskId[]{this.task0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{this.task2});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet2, Collections.emptySet(), "localhost:2171").encode()));
        hashMap.put("consumer11", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet3, Collections.emptySet(), "localhost:2171").encode()));
        hashMap.put("consumer20", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID2, mkSet4, Collections.emptySet(), "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(this.metadata, hashMap);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        hashSet.addAll(AssignmentInfo.decode(((PartitionAssignor.Assignment) assign.get("consumer10")).userData()).activeTasks);
        hashSet2.addAll(((PartitionAssignor.Assignment) assign.get("consumer10")).partitions());
        hashSet.addAll(AssignmentInfo.decode(((PartitionAssignor.Assignment) assign.get("consumer11")).userData()).activeTasks);
        hashSet2.addAll(((PartitionAssignor.Assignment) assign.get("consumer11")).partitions());
        hashSet.addAll(AssignmentInfo.decode(((PartitionAssignor.Assignment) assign.get("consumer20")).userData()).activeTasks);
        hashSet2.addAll(((PartitionAssignor.Assignment) assign.get("consumer20")).partitions());
        Assert.assertEquals(mkSet, hashSet);
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), hashSet2);
    }

    @Test
    public void testAssignWithStates() throws Exception {
        this.builder.setApplicationId("test");
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockStateStoreSupplier("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore(new MockStateStoreSupplier("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore(new MockStateStoreSupplier("store3", false), new String[]{"processor-2"});
        List mkList = Utils.mkList(new String[]{"topic1", "topic2"});
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 1);
        TaskId taskId3 = new TaskId(0, 2);
        TaskId taskId4 = new TaskId(1, 0);
        TaskId taskId5 = new TaskId(1, 1);
        TaskId taskId6 = new TaskId(1, 2);
        List<TaskId> mkList2 = Utils.mkList(new TaskId[]{taskId, taskId2, taskId3, taskId4, taskId5, taskId6});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        hashMap.put("consumer11", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        hashMap.put("consumer20", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID2, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(this.metadata, hashMap);
        Assert.assertEquals(2L, ((PartitionAssignor.Assignment) assign.get("consumer10")).partitions().size());
        Assert.assertEquals(2L, ((PartitionAssignor.Assignment) assign.get("consumer11")).partitions().size());
        Assert.assertEquals(2L, ((PartitionAssignor.Assignment) assign.get("consumer20")).partitions().size());
        AssignmentInfo decode = AssignmentInfo.decode(((PartitionAssignor.Assignment) assign.get("consumer10")).userData());
        AssignmentInfo decode2 = AssignmentInfo.decode(((PartitionAssignor.Assignment) assign.get("consumer11")).userData());
        AssignmentInfo decode3 = AssignmentInfo.decode(((PartitionAssignor.Assignment) assign.get("consumer20")).userData());
        Assert.assertEquals(2L, decode.activeTasks.size());
        Assert.assertEquals(2L, decode2.activeTasks.size());
        Assert.assertEquals(2L, decode3.activeTasks.size());
        HashSet hashSet = new HashSet();
        hashSet.addAll(decode.activeTasks);
        hashSet.addAll(decode2.activeTasks);
        hashSet.addAll(decode3.activeTasks);
        Assert.assertEquals(new HashSet(mkList2), hashSet);
        Map<Integer, TopologyBuilder.TopicsInfo> map = streamThread.builder.topicGroups();
        Assert.assertEquals(Utils.mkSet(new TaskId[]{taskId, taskId2, taskId3}), tasksForState("test", "store1", mkList2, map));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{taskId4, taskId5, taskId6}), tasksForState("test", "store2", mkList2, map));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{taskId4, taskId5, taskId6}), tasksForState("test", "store3", mkList2, map));
    }

    private Set<TaskId> tasksForState(String str, String str2, List<TaskId> list, Map<Integer, TopologyBuilder.TopicsInfo> map) {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic(str, str2);
        HashSet hashSet = new HashSet();
        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : map.entrySet()) {
            if (entry.getValue().stateChangelogTopics.keySet().contains(storeChangelogTopic)) {
                for (TaskId taskId : list) {
                    if (taskId.topicGroupId == entry.getKey().intValue()) {
                        hashSet.add(taskId);
                    }
                }
            }
        }
        return hashSet;
    }

    @Test
    public void testAssignWithStandbyReplicas() throws Exception {
        Properties configProps = configProps();
        configProps.setProperty("num.standby.replicas", "1");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List mkList = Utils.mkList(new String[]{"topic1", "topic2"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2});
        Set mkSet2 = Utils.mkSet(new TaskId[]{this.task0});
        Set mkSet3 = Utils.mkSet(new TaskId[]{this.task1});
        Set mkSet4 = Utils.mkSet(new TaskId[]{this.task2});
        Set mkSet5 = Utils.mkSet(new TaskId[]{this.task1});
        Set mkSet6 = Utils.mkSet(new TaskId[]{this.task2});
        Set mkSet7 = Utils.mkSet(new TaskId[]{this.task0});
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, streamsConfig, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(streamsConfig.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet2, mkSet5, "localhost:2171").encode()));
        hashMap.put("consumer11", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, mkSet3, mkSet6, "localhost:2171").encode()));
        hashMap.put("consumer20", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID2, mkSet4, mkSet7, "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(this.metadata, hashMap);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        AssignmentInfo checkAssignment = checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign.get("consumer10"));
        hashSet.addAll(checkAssignment.activeTasks);
        hashSet2.addAll(checkAssignment.standbyTasks.keySet());
        AssignmentInfo checkAssignment2 = checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign.get("consumer11"));
        hashSet.addAll(checkAssignment2.activeTasks);
        hashSet2.addAll(checkAssignment2.standbyTasks.keySet());
        Assert.assertNotEquals("same processId has same set of standby tasks", checkAssignment2.standbyTasks.keySet(), checkAssignment.standbyTasks.keySet());
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task0, this.task1}), new HashSet(hashSet));
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.task2}), new HashSet(hashSet2));
        AssignmentInfo checkAssignment3 = checkAssignment(this.allTopics, (PartitionAssignor.Assignment) assign.get("consumer20"));
        hashSet.addAll(checkAssignment3.activeTasks);
        hashSet2.addAll(checkAssignment3.standbyTasks.keySet());
        Assert.assertEquals(3L, hashSet.size());
        Assert.assertEquals(mkSet, hashSet);
        Assert.assertEquals(3L, hashSet2.size());
        Assert.assertEquals(mkSet, hashSet2);
    }

    @Test
    public void testOnAssignment() throws Exception {
        TopicPartition topicPartition = new TopicPartition("topic2", 3);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source1", new String[]{"topic1"});
        topologyBuilder.addSource("source2", new String[]{"topic2"});
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        this.partitionAssignor.configure(this.config.getConsumerConfigs(new StreamThread(topologyBuilder, this.config, this.mockClientSupplier, "test", "client1", UUID.randomUUID(), new Metrics(), Time.SYSTEM, new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L), "test", "client1"));
        List mkList = Utils.mkList(new TaskId[]{this.task0, this.task3});
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put(this.task0, Utils.mkSet(new TopicPartition[]{this.t1p0}));
        hashMap.put(this.task3, Utils.mkSet(new TopicPartition[]{topicPartition}));
        hashMap2.put(this.task1, Utils.mkSet(new TopicPartition[]{this.t1p0}));
        hashMap2.put(this.task2, Utils.mkSet(new TopicPartition[]{this.t2p0}));
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(Utils.mkList(new TopicPartition[]{this.t1p0, topicPartition}), new AssignmentInfo(mkList, hashMap2, new HashMap()).encode()));
        Assert.assertEquals(hashMap, this.partitionAssignor.activeTasks());
        Assert.assertEquals(hashMap2, this.partitionAssignor.standbyTasks());
    }

    @Test
    public void testAssignWithInternalTopics() throws Exception {
        this.builder.setApplicationId("test");
        this.builder.addInternalTopic("topicX");
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", new String[]{"processor1"});
        this.builder.addSource("source2", new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        List mkList = Utils.mkList(new String[]{"topic1", "test-topicX"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2});
        UUID randomUUID = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        Set emptySet = Collections.emptySet();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, emptySet, emptySet, "localhost:2171").encode()));
        this.partitionAssignor.assign(this.metadata, hashMap);
        Assert.assertEquals(1L, r0.readyTopics.size());
        Assert.assertEquals(mkSet.size(), r0.readyTopics.get("test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
        this.builder.setApplicationId("test");
        this.builder.addInternalTopic("topicX");
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", new String[]{"processor1"});
        this.builder.addSource("source2", new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", new String[]{"processor2"});
        this.builder.addSource("source3", new String[]{"topicZ"});
        List mkList = Utils.mkList(new String[]{"topic1", "test-topicX", "test-topicZ"});
        Set mkSet = Utils.mkSet(new TaskId[]{this.task0, this.task1, this.task2});
        UUID randomUUID = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.mockClientSupplier, "test", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "test", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        Set emptySet = Collections.emptySet();
        hashMap.put("consumer10", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, emptySet, emptySet, "localhost:2171").encode()));
        this.partitionAssignor.assign(this.metadata, hashMap);
        Assert.assertEquals(2L, r0.readyTopics.size());
        Assert.assertEquals(mkSet.size(), r0.readyTopics.get("test-topicZ").intValue());
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
        Properties configProps = configProps();
        configProps.put("application.server", "localhost:8080");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        this.builder.setApplicationId("application-id");
        this.builder.addSource("source", new String[]{"input"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", new String[]{"processor"});
        this.partitionAssignor.configure(streamsConfig.getConsumerConfigs(new StreamThread(this.builder, streamsConfig, this.mockClientSupplier, "application-id", "client1", UUID.randomUUID(), new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L), "application-id", "client1"));
        Assert.assertEquals("localhost:8080", SubscriptionInfo.decode(this.partitionAssignor.subscription(Utils.mkSet(new String[]{"input"})).userData()).userEndPoint);
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
        Properties configProps = configProps();
        configProps.put("application.server", "localhost:8080");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        this.builder.setApplicationId("application-id");
        this.builder.addSource("source", new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", new String[]{"processor"});
        List mkList = Utils.mkList(new String[]{"topic1"});
        UUID randomUUID = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(this.builder, streamsConfig, this.mockClientSupplier, "application-id", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        StreamPartitionAssignor streamPartitionAssignor = new StreamPartitionAssignor();
        streamPartitionAssignor.configure(streamsConfig.getConsumerConfigs(streamThread, "application-id", "client1"));
        streamPartitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        Set emptySet = Collections.emptySet();
        hashMap.put("consumer1", new PartitionAssignor.Subscription(mkList, new SubscriptionInfo(randomUUID, emptySet, emptySet, "localhost:8080").encode()));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Set) AssignmentInfo.decode(((PartitionAssignor.Assignment) streamPartitionAssignor.assign(this.metadata, hashMap).get("consumer1")).userData()).partitionsByHost.get(new HostInfo("localhost", 8080)));
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
        Properties configProps = configProps();
        configProps.put("application.server", "localhost");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        UUID randomUUID = UUID.randomUUID();
        this.builder.setApplicationId("application-id");
        StreamThread streamThread = new StreamThread(this.builder, streamsConfig, this.mockClientSupplier, "application-id", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        try {
            this.partitionAssignor.configure(streamsConfig.getConsumerConfigs(streamThread, "application-id", "client1"));
            Assert.fail("expected to an exception due to invalid config");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
        Properties configProps = configProps();
        configProps.put("application.server", "localhost:j87yhk");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        UUID randomUUID = UUID.randomUUID();
        this.builder.setApplicationId("application-id");
        try {
            this.partitionAssignor.configure(streamsConfig.getConsumerConfigs(new StreamThread(this.builder, streamsConfig, this.mockClientSupplier, "application-id", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L), "application-id", "client1"));
            Assert.fail("expected to an exception due to invalid config");
        } catch (ConfigException e) {
        }
    }

    @Test
    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
        List singletonList = Collections.singletonList(new TopicPartition("topic", 0));
        Map singletonMap = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(singletonList, new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), singletonMap).encode()));
        Assert.assertEquals(singletonMap, this.partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldSetClusterMetadataOnAssignment() throws Exception {
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(Collections.singletonList(new TopicPartition("topic", 0)), new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)))).encode()));
        PartitionInfo partitionInfo = (PartitionInfo) this.partitionAssignor.clusterMetadata().partitionsForTopic("topic").get(0);
        Assert.assertEquals(1L, r0.size());
        Assert.assertEquals("topic", partitionInfo.topic());
        Assert.assertEquals(0L, partitionInfo.partition());
    }

    @Test
    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
        Assert.assertNotNull(this.partitionAssignor.clusterMetadata());
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("application-id");
        kStreamBuilder.stream(new String[]{"unknownTopic"}).selectKey(new KeyValueMapper<Object, Object, Object>() { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest.6
            public Object apply(Object obj, Object obj2) {
                return null;
            }
        }).join(kStreamBuilder.stream(new String[]{"topic1"}).selectKey(new KeyValueMapper<Object, Object, Object>() { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest.4
            public Object apply(Object obj, Object obj2) {
                return null;
            }
        }).groupByKey().count("count").toStream().map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>() { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest.3
            public KeyValue<Object, Object> apply(Object obj, Long l) {
                return null;
            }
        }), new ValueJoiner() { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest.5
            public Object apply(Object obj, Object obj2) {
                return null;
            }
        }, JoinWindows.of(0L));
        UUID randomUUID = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.mockClientSupplier, "application-id", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "application-id", "client1"));
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        HashMap hashMap = new HashMap();
        Set emptySet = Collections.emptySet();
        hashMap.put("client1", new PartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), new SubscriptionInfo(randomUUID, emptySet, emptySet, "localhost:2171").encode()));
        Map assign = this.partitionAssignor.assign(this.metadata, hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("application-id-count-repartition", 3);
        hashMap2.put("application-id-count-changelog", 3);
        Assert.assertThat(mockInternalTopicManager.readyTopics, CoreMatchers.equalTo(hashMap2));
        Assert.assertThat(new HashSet(((PartitionAssignor.Assignment) assign.get("client1")).partitions()), CoreMatchers.equalTo(new HashSet(Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("application-id-count-repartition", 0), new TopicPartition("application-id-count-repartition", 1), new TopicPartition("application-id-count-repartition", 2)))));
    }

    @Test
    public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
        TopicPartition topicPartition = new TopicPartition("topic", 1);
        TopicPartition topicPartition2 = new TopicPartition("topic", 2);
        Map<HostInfo, Set<TopicPartition>> singletonMap = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}));
        HashMap hashMap = new HashMap();
        hashMap.put(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{topicPartition}));
        hashMap.put(new HostInfo("other", 9090), Utils.mkSet(new TopicPartition[]{topicPartition2}));
        this.partitionAssignor.onAssignment(createAssignment(singletonMap));
        Assert.assertEquals(singletonMap, this.partitionAssignor.getPartitionsByHostState());
        this.partitionAssignor.onAssignment(createAssignment(hashMap));
        Assert.assertEquals(hashMap, this.partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
        TopicPartition topicPartition = new TopicPartition("topic", 1);
        TopicPartition topicPartition2 = new TopicPartition("topic2", 2);
        Map<HostInfo, Set<TopicPartition>> singletonMap = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{topicPartition}));
        Map<HostInfo, Set<TopicPartition>> singletonMap2 = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}));
        this.partitionAssignor.onAssignment(createAssignment(singletonMap));
        Assert.assertEquals(Utils.mkSet(new String[]{"topic"}), this.partitionAssignor.clusterMetadata().topics());
        this.partitionAssignor.onAssignment(createAssignment(singletonMap2));
        Assert.assertEquals(Utils.mkSet(new String[]{"topic", "topic2"}), this.partitionAssignor.clusterMetadata().topics());
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
        Properties configProps = configProps();
        configProps.setProperty("num.standby.replicas", "1");
        StreamsConfig streamsConfig = new StreamsConfig(configProps);
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("appId");
        kStreamBuilder.stream(new String[]{"topic1"}).groupByKey().count("count");
        UUID randomUUID = UUID.randomUUID();
        StreamThread streamThread = new StreamThread(kStreamBuilder, streamsConfig, this.mockClientSupplier, "appId", "client1", randomUUID, new Metrics(), Time.SYSTEM, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(streamsConfig.getConsumerConfigs(streamThread, "appId", "client1"));
        this.partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap hashMap = new HashMap();
        Set emptySet = Collections.emptySet();
        hashMap.put("consumer1", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(randomUUID, emptySet, emptySet, "localhost:2171").encode()));
        hashMap.put("consumer2", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptySet, emptySet, "other:9090").encode()));
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        AssignmentInfo decode = AssignmentInfo.decode(((PartitionAssignor.Assignment) this.partitionAssignor.assign(this.metadata, hashMap).get("consumer1")).userData());
        Set set = (Set) decode.partitionsByHost.get(new HostInfo("localhost", 2171));
        Set set2 = (Set) decode.partitionsByHost.get(new HostInfo("other", 9090));
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set2);
        Assert.assertThat(set, CoreMatchers.not(mkSet));
        Assert.assertThat(set2, CoreMatchers.not(mkSet));
        Assert.assertThat(hashSet, CoreMatchers.equalTo(mkSet));
    }

    @Test(expected = KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception {
        this.partitionAssignor.configure(Collections.singletonMap("num.standby.replicas", 1));
    }

    @Test(expected = KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("num.standby.replicas", 1);
        hashMap.put("__stream.thread.instance__", "i am not a stream thread");
        this.partitionAssignor.configure(hashMap);
    }

    private PartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> map) {
        return new PartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), map).encode());
    }

    private AssignmentInfo checkAssignment(Set<String> set, PartitionAssignor.Assignment assignment) {
        AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
        Assert.assertEquals(assignment.partitions().size(), decode.activeTasks.size());
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : assignment.partitions()) {
            arrayList.add(new TaskId(0, topicPartition.partition()));
            hashSet.add(topicPartition.topic());
        }
        Assert.assertEquals(arrayList, decode.activeTasks);
        Assert.assertEquals(set, hashSet);
        HashSet hashSet2 = new HashSet();
        for (Map.Entry entry : decode.standbyTasks.entrySet()) {
            TaskId taskId = (TaskId) entry.getKey();
            for (TopicPartition topicPartition2 : (Set) entry.getValue()) {
                Assert.assertEquals(taskId.partition, topicPartition2.partition());
                hashSet2.add(topicPartition2.topic());
            }
        }
        if (decode.standbyTasks.size() > 0) {
            Assert.assertEquals(set, hashSet2);
        }
        return decode;
    }
}
