package org.apache.kafka.streams.processor.internals.assignment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentUtilsTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/CustomStickyTaskAssignorTest.class */
public class CustomStickyTaskAssignorTest {
    private TaskAssignor assignor;

    @BeforeEach
    public void setUp() {
        this.assignor = new StickyTaskAssignor();
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount(String str) {
        Iterator<KafkaStreamsAssignment> it = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str).values().iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(Integer.valueOf(it.next().tasks().size()), Matchers.equalTo(1));
        }
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks(String str) {
        assertActiveTaskTopicGroupIdsEvenlyDistributed(assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 2, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 2, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 2, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_2, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_2, false)}), str));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks(String str) {
        assertActiveTaskTopicGroupIdsEvenlyDistributed(assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 2, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 2, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 2, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_2, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_2, false)}), 1, str));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldNotMigrateActiveTaskToOtherProcess(String str) {
        Map<ProcessId, KafkaStreamsState> mkMap = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of())});
        Map<TaskId, TaskInfo> mkMap2 = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)});
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(mkMap, mkMap2, str);
        assertHasAssignment(assign, 1, AssignmentTestUtils.TASK_0_0, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 2, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        Map<ProcessId, KafkaStreamsAssignment> assign2 = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of())}), mkMap2, str);
        assertHasAssignment(assign2, 1, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign2, 2, AssignmentTestUtils.TASK_0_0, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str);
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(1)).tasks().values().size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(2)).tasks().values().size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(3)).tasks().values().size()), Matchers.equalTo(1));
        assertHasAssignment(assign, 2, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignBasedOnCapacity(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 2, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str);
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(1)).tasks().values().size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(2)).tasks().values().size()), Matchers.equalTo(2));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes(String str) {
        Map<TaskId, TaskInfo> mkMap = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_3, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_4, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_5, false)});
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_0_4, AssignmentTestUtils.TASK_0_5, AssignmentTestUtils.TASK_1_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty())}), mkMap, str);
        Set set = (Set) assign.get(TaskAssignmentUtilsTest.processId(1)).tasks().values().stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Set set2 = (Set) assign.get(TaskAssignmentUtilsTest.processId(2)).tasks().values().stream().filter(assignedTask2 -> {
            return assignedTask2.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Set<TaskId> keySet = mkMap.keySet();
        MatcherAssert.assertThat(Boolean.valueOf((set.size() == 3 && set2.size() == 4) || (set.size() == 4 && set2.size() == 3)), Matchers.is(true));
        keySet.removeAll(set);
        MatcherAssert.assertThat(set2, Matchers.equalTo(keySet));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldKeepActiveTaskStickinessWhenMoreClientThanActiveTasks(String str) {
        Map<TaskId, TaskInfo> mkMap = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)});
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_2), Set.of()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(5, 1, Optional.empty())}), mkMap, str);
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(1)).tasks().size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(2)).tasks().size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(3)).tasks().size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(4)).tasks().size()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(5)).tasks().size()), Matchers.is(0));
        assertHasAssignment(assign, 1, AssignmentTestUtils.TASK_0_0, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 2, AssignmentTestUtils.TASK_0_2, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 3, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        Map<ProcessId, KafkaStreamsAssignment> assign2 = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(5, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_2), Set.of())}), mkMap, str);
        MatcherAssert.assertThat(Integer.valueOf(assign2.get(TaskAssignmentUtilsTest.processId(1)).tasks().size()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(assign2.get(TaskAssignmentUtilsTest.processId(2)).tasks().size()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(assign2.get(TaskAssignmentUtilsTest.processId(3)).tasks().size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(assign2.get(TaskAssignmentUtilsTest.processId(4)).tasks().size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(assign2.get(TaskAssignmentUtilsTest.processId(5)).tasks().size()), Matchers.is(1));
        assertHasAssignment(assign2, 3, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign2, 4, AssignmentTestUtils.TASK_0_0, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign2, 5, AssignmentTestUtils.TASK_0_2, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignTasksToClientWithPreviousStandbyTasks(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(), Set.of(AssignmentTestUtils.TASK_0_2)), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(), Set.of(AssignmentTestUtils.TASK_0_1)), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty(), Set.of(), Set.of(AssignmentTestUtils.TASK_0_0))}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str);
        assertHasAssignment(assign, 1, AssignmentTestUtils.TASK_0_2, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 2, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 3, AssignmentTestUtils.TASK_0_0, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of(AssignmentTestUtils.TASK_0_1)), TaskAssignmentUtilsTest.mkStreamState(2, 2, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_2), Set.of(AssignmentTestUtils.TASK_0_1))}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str);
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(1)).tasks().size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(assign.get(TaskAssignmentUtilsTest.processId(2)).tasks().size()), Matchers.is(2));
        assertHasAssignment(assign, 1, AssignmentTestUtils.TASK_0_0, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 2, AssignmentTestUtils.TASK_0_1, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        assertHasAssignment(assign, 2, AssignmentTestUtils.TASK_0_2, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssignedTo(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_2), Set.of()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_3), Set.of())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_3, true)}), 1, str);
        MatcherAssert.assertThat(Integer.valueOf(standbyTasks(assign, 1).size()), Matchers.lessThanOrEqualTo(2));
        MatcherAssert.assertThat(Integer.valueOf(standbyTasks(assign, 2).size()), Matchers.lessThanOrEqualTo(2));
        MatcherAssert.assertThat(Integer.valueOf(standbyTasks(assign, 3).size()), Matchers.lessThanOrEqualTo(2));
        MatcherAssert.assertThat(Integer.valueOf(standbyTasks(assign, 4).size()), Matchers.lessThanOrEqualTo(2));
        MatcherAssert.assertThat(standbyTasks(assign, 1), Matchers.not(Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_0})));
        MatcherAssert.assertThat(standbyTasks(assign, 2), Matchers.not(Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat(standbyTasks(assign, 3), Matchers.not(Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_2})));
        MatcherAssert.assertThat(standbyTasks(assign, 4), Matchers.not(Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_3})));
        MatcherAssert.assertThat(activeTasks(assign, 1), Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_0}));
        MatcherAssert.assertThat(activeTasks(assign, 2), Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_1}));
        MatcherAssert.assertThat(activeTasks(assign, 3), Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_2}));
        MatcherAssert.assertThat(activeTasks(assign, 4), Matchers.hasItems(new TaskId[]{AssignmentTestUtils.TASK_0_3}));
        int i = 0;
        for (int i2 = 1; i2 <= 4; i2++) {
            i += standbyTasks(assign, i2).isEmpty() ? 0 : 1;
        }
        MatcherAssert.assertThat(Integer.valueOf(i), Matchers.greaterThanOrEqualTo(3));
        MatcherAssert.assertThat((Set) allTasks(assign).stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3)));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignMultipleReplicasOfStandbyTask(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1), Set.of()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_2), Set.of())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, true)}), 2, str);
        MatcherAssert.assertThat(activeTasks(assign, 1), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(activeTasks(assign, 2), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_1)));
        MatcherAssert.assertThat(activeTasks(assign, 3), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_2)));
        MatcherAssert.assertThat(standbyTasks(assign, 1), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2)));
        MatcherAssert.assertThat(standbyTasks(assign, 2), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2)));
        MatcherAssert.assertThat(standbyTasks(assign, 3), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1)));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true)}), 2, str);
        MatcherAssert.assertThat(activeTasks(assign, 1), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(standbyTasks(assign, 1), Matchers.equalTo(Set.of()));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignActiveAndStandbyTasks(String str) {
        List<KafkaStreamsAssignment.AssignedTask> allTasks = allTasks(assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, true)}), 1, str));
        MatcherAssert.assertThat((Set) allTasks.stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2)));
        MatcherAssert.assertThat((Set) allTasks.stream().filter(assignedTask2 -> {
            return assignedTask2.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2)));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignAtLeastOneTaskToEachClientIfPossible(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 3, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str);
        MatcherAssert.assertThat(Integer.valueOf(activeTasks(assign, 1).size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(activeTasks(assign, 2).size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(activeTasks(assign, 3).size()), Matchers.is(1));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(5, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(6, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), str);
        List<KafkaStreamsAssignment.AssignedTask> allTasks = allTasks(assign);
        MatcherAssert.assertThat((Set) allTasks.stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2)));
        MatcherAssert.assertThat((Set) allTasks.stream().filter(assignedTask2 -> {
            return assignedTask2.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()), Matchers.equalTo(Set.of()));
        MatcherAssert.assertThat(Integer.valueOf(assign.values().stream().mapToInt(kafkaStreamsAssignment -> {
            return kafkaStreamsAssignment.tasks().isEmpty() ? 0 : 1;
        }).sum()), Matchers.greaterThanOrEqualTo(3));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients(String str) {
        Iterator<KafkaStreamsAssignment> it = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(5, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(6, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, true)}), 1, str).values().iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(it.next().tasks().values(), Matchers.not(Matchers.hasSize(0)));
        }
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignMoreTasksToClientWithMoreCapacity(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 2, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_1_2, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_2_2, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_3_0, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_3_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_3_2, false)}), str);
        MatcherAssert.assertThat(Integer.valueOf(activeTasks(assign, 1).size()), Matchers.equalTo(4));
        MatcherAssert.assertThat(Integer.valueOf(activeTasks(assign, 2).size()), Matchers.equalTo(8));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @Test
    public void shouldEvenlyDistributeByTaskIdAndPartition() {
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldNotHaveSameAssignmentOnAnyTwoHosts(String str) {
        Map<TaskId, TaskInfo> mkMap = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_3, true)});
        Map<ProcessId, KafkaStreamsState> mkMap2 = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty())});
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(mkMap2, mkMap, 1, str);
        for (KafkaStreamsState kafkaStreamsState : mkMap2.values()) {
            for (KafkaStreamsState kafkaStreamsState2 : mkMap2.values()) {
                if (!kafkaStreamsState.processId().equals(kafkaStreamsState2.processId())) {
                    MatcherAssert.assertThat("clients shouldn't have same task assignment", assign.get(kafkaStreamsState.processId()).tasks().keySet(), Matchers.not(Matchers.equalTo(assign.get(kafkaStreamsState2.processId()).tasks().keySet())));
                }
            }
        }
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks(String str) {
        Map<TaskId, TaskInfo> mkMap = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_3, true)});
        Map<ProcessId, KafkaStreamsState> mkMap2 = Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2), Set.of()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_3), Set.of()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty(), Set.of(AssignmentTestUtils.TASK_0_0), Set.of()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty())});
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(mkMap2, mkMap, 1, str);
        for (KafkaStreamsState kafkaStreamsState : mkMap2.values()) {
            for (KafkaStreamsState kafkaStreamsState2 : mkMap2.values()) {
                if (!kafkaStreamsState.processId().equals(kafkaStreamsState2.processId())) {
                    MatcherAssert.assertThat("clients shouldn't have same task assignment", assign.get(kafkaStreamsState.processId()).tasks().keySet(), Matchers.not(Matchers.equalTo(assign.get(kafkaStreamsState2.processId()).tasks().keySet())));
                }
            }
        }
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void shouldAssignMultipleStandbys(String str) {
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkStreamState(1, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(2, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(3, 1, Optional.empty()), TaskAssignmentUtilsTest.mkStreamState(4, 1, Optional.empty())}), Utils.mkMap(new Map.Entry[]{TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_0, true), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_1, false), TaskAssignmentUtilsTest.mkTaskInfo(AssignmentTestUtils.TASK_0_2, false)}), 3, str);
        MatcherAssert.assertThat(standbyTasks(assign, 1), Matchers.equalTo(Set.of()));
        MatcherAssert.assertThat(standbyTasks(assign, 2), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(standbyTasks(assign, 3), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(standbyTasks(assign, 4), Matchers.equalTo(Set.of(AssignmentTestUtils.TASK_0_0)));
    }

    @Timeout(value = 3, unit = TimeUnit.MINUTES)
    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void largeAssignmentShouldTerminateWithinAcceptableTime(String str) {
        Map<TaskId, TaskInfo> mkMap = Utils.mkMap(new Map.Entry[0]);
        for (int i = 0; i < 10; i++) {
            for (int i2 = 0; i2 < 30; i2++) {
                Map.Entry<TaskId, TaskInfo> mkTaskInfo = TaskAssignmentUtilsTest.mkTaskInfo(new TaskId(i, i2), true, Set.of(String.format("rack-%d", Integer.valueOf((i * i2) % 31))));
                mkMap.put(mkTaskInfo.getKey(), mkTaskInfo.getValue());
            }
        }
        Map<ProcessId, KafkaStreamsState> mkMap2 = Utils.mkMap(new Map.Entry[0]);
        for (int i3 = 0; i3 < 20; i3++) {
            Map.Entry<ProcessId, KafkaStreamsState> mkStreamState = TaskAssignmentUtilsTest.mkStreamState(i3 + 1, 50, Optional.of(String.format("rack-%d", Integer.valueOf(i3 % 31))), Set.of(), Set.of());
            mkMap2.put(mkStreamState.getKey(), mkStreamState.getValue());
        }
        Map<ProcessId, KafkaStreamsAssignment> assign = assign(mkMap2, mkMap, new AssignmentConfigs(0L, 1, 2, 60000L, Collections.emptyList(), OptionalInt.of(1), OptionalInt.of(2), str));
        MatcherAssert.assertThat(Integer.valueOf(((List) allTasks(assign).stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList())).size()), Matchers.equalTo(300));
        MatcherAssert.assertThat(Integer.valueOf(((List) allTasks(assign).stream().filter(assignedTask2 -> {
            return assignedTask2.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList())).size()), Matchers.equalTo(600));
    }

    private Map<ProcessId, KafkaStreamsAssignment> assign(Map<ProcessId, KafkaStreamsState> map, Map<TaskId, TaskInfo> map2, String str) {
        return assign(map, map2, 0, str);
    }

    private Map<ProcessId, KafkaStreamsAssignment> assign(Map<ProcessId, KafkaStreamsState> map, Map<TaskId, TaskInfo> map2, int i, String str) {
        return assign(map, map2, defaultAssignmentConfigs(i, str));
    }

    private Map<ProcessId, KafkaStreamsAssignment> assign(Map<ProcessId, KafkaStreamsState> map, Map<TaskId, TaskInfo> map2, AssignmentConfigs assignmentConfigs) {
        TaskAssignmentUtilsTest.TestApplicationState testApplicationState = new TaskAssignmentUtilsTest.TestApplicationState(assignmentConfigs, map, map2);
        TaskAssignor.TaskAssignment assign = this.assignor.assign(testApplicationState);
        MatcherAssert.assertThat(TaskAssignmentUtils.validateTaskAssignment(testApplicationState, assign), Matchers.equalTo(TaskAssignor.AssignmentError.NONE));
        return indexAssignment(assign.assignment());
    }

    public AssignmentConfigs defaultAssignmentConfigs(int i, String str) {
        return new AssignmentConfigs(0L, 1, i, 60000L, Collections.emptyList(), OptionalInt.empty(), OptionalInt.empty(), str);
    }

    private Map<ProcessId, KafkaStreamsAssignment> indexAssignment(Collection<KafkaStreamsAssignment> collection) {
        return (Map) collection.stream().collect(Collectors.toMap((v0) -> {
            return v0.processId();
        }, kafkaStreamsAssignment -> {
            return kafkaStreamsAssignment;
        }));
    }

    private Set<TaskId> activeTasks(Map<ProcessId, KafkaStreamsAssignment> map, int i) {
        KafkaStreamsAssignment orDefault = map.getOrDefault(TaskAssignmentUtilsTest.processId(i), null);
        return orDefault == null ? Set.of() : (Set) orDefault.tasks().values().stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    private Set<TaskId> standbyTasks(Map<ProcessId, KafkaStreamsAssignment> map, int i) {
        KafkaStreamsAssignment orDefault = map.getOrDefault(TaskAssignmentUtilsTest.processId(i), null);
        return orDefault == null ? Set.of() : (Set) orDefault.tasks().values().stream().filter(assignedTask -> {
            return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    private List<KafkaStreamsAssignment.AssignedTask> allTasks(Map<ProcessId, KafkaStreamsAssignment> map) {
        ArrayList arrayList = new ArrayList();
        map.values().forEach(kafkaStreamsAssignment -> {
            arrayList.addAll(kafkaStreamsAssignment.tasks().values());
        });
        return arrayList;
    }

    private void assertHasAssignment(Map<ProcessId, KafkaStreamsAssignment> map, int i, TaskId taskId, KafkaStreamsAssignment.AssignedTask.Type type) {
        KafkaStreamsAssignment orDefault = map.getOrDefault(TaskAssignmentUtilsTest.processId(i), null);
        MatcherAssert.assertThat(orDefault, Matchers.notNullValue());
        KafkaStreamsAssignment.AssignedTask assignedTask = (KafkaStreamsAssignment.AssignedTask) orDefault.tasks().getOrDefault(taskId, null);
        MatcherAssert.assertThat(assignedTask, Matchers.notNullValue());
        MatcherAssert.assertThat(Boolean.valueOf(assignedTask.id().equals(taskId)), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(assignedTask.type().equals(type)), Matchers.is(true));
    }

    private void assertActiveTaskTopicGroupIdsEvenlyDistributed(Map<ProcessId, KafkaStreamsAssignment> map) {
        for (KafkaStreamsAssignment kafkaStreamsAssignment : map.values()) {
            ArrayList arrayList = new ArrayList();
            Iterator it = ((Set) kafkaStreamsAssignment.tasks().values().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet())).iterator();
            while (it.hasNext()) {
                arrayList.add(Integer.valueOf(((TaskId) it.next()).subtopology()));
            }
            Collections.sort(arrayList);
            MatcherAssert.assertThat(arrayList, Matchers.equalTo(Arrays.asList(1, 2)));
        }
    }
}
