package org.apache.kafka.streams.processor.internals.assignment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Supplier;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.class */
public class TaskAssignorConvergenceTest {
    private static Random random;
    private static final Time TIME = new MockTime();
    private int skewThreshold = 1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest$Harness.class */
    public static final class Harness {
        private final Set<TaskId> statelessTasks;
        private final Map<TaskId, Long> statefulTaskEndOffsetSums;
        private final Map<ProcessId, ClientState> clientStates;
        public final Map<TaskId, Set<TopicPartition>> partitionsForTask;
        public final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask;
        public final Map<TopologyMetadata.Subtopology, Set<TaskId>> tasksForTopicGroup;
        public final Cluster fullMetadata;
        public final Map<ProcessId, Map<String, Optional<String>>> racksForProcessConsumer;
        public final InternalTopicManager internalTopicManager;
        private final StringBuilder history = new StringBuilder();
        private final Map<ProcessId, ClientState> droppedClientStates = new TreeMap();

        private static Harness initializeCluster(int i, int i2, int i3, Supplier<Integer> supplier, int i4) {
            int i5 = 0;
            TreeSet treeSet = new TreeSet();
            int i6 = i;
            List<Node> randomNodes = AssignmentTestUtils.getRandomNodes(i4);
            int i7 = 0;
            HashSet hashSet = new HashSet();
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            while (i6 > 0) {
                int min = Math.min(i6, supplier.get().intValue());
                for (int i8 = 0; i8 < min; i8++) {
                    TaskId taskId = new TaskId(i5, i8);
                    treeSet.add(taskId);
                    i6--;
                    Node[] randomReplica = AssignmentTestUtils.getRandomReplica(randomNodes, i7, i8);
                    hashSet.add(new PartitionInfo("topic_" + i5, i8, randomReplica[0], randomReplica, randomReplica));
                    i7++;
                    hashMap.put(taskId, Set.of(new TopicPartition("topic_" + i5, i8)));
                    ((Set) hashMap3.computeIfAbsent(new TopologyMetadata.Subtopology(i5, (String) null), subtopology -> {
                        return new HashSet();
                    })).add(taskId);
                }
                i5++;
            }
            TreeMap treeMap = new TreeMap();
            HashMap hashMap4 = new HashMap();
            HashSet hashSet2 = new HashSet();
            int i9 = i2;
            while (i9 > 0) {
                String str = "changelog-topic_" + i5;
                hashSet2.add(str);
                int min2 = Math.min(i9, supplier.get().intValue());
                for (int i10 = 0; i10 < min2; i10++) {
                    TaskId taskId2 = new TaskId(i5, i10);
                    treeMap.put(taskId2, 150000L);
                    i9--;
                    Node[] randomReplica2 = AssignmentTestUtils.getRandomReplica(randomNodes, i7, i10);
                    hashSet.add(new PartitionInfo("topic_" + i5, i10, randomReplica2[0], randomReplica2, randomReplica2));
                    i7++;
                    hashMap.put(taskId2, Set.of(new TopicPartition("topic_" + i5, i10)));
                    hashMap2.put(taskId2, Set.of(new TopicPartition(str, i10)));
                    ((Set) hashMap3.computeIfAbsent(new TopologyMetadata.Subtopology(i5, (String) null), subtopology2 -> {
                        return new HashSet();
                    })).add(taskId2);
                    Node[] randomReplica3 = AssignmentTestUtils.getRandomReplica(randomNodes, TaskAssignorConvergenceTest.random.nextInt(randomNodes.size()), i10);
                    ((List) hashMap4.computeIfAbsent(str, str2 -> {
                        return new ArrayList();
                    })).add(new TopicPartitionInfo(i10, randomReplica3[0], Arrays.asList(randomReplica3[0], randomReplica3[1]), Collections.emptyList()));
                }
                i5++;
            }
            InternalTopicManager internalTopicManager = (InternalTopicManager) Mockito.spy(new MockInternalTopicManager(new MockTime(), new StreamsConfig(AssignmentTestUtils.configProps("min_traffic")), new MockClientSupplier().restoreConsumer, false));
            ((InternalTopicManager) Mockito.doReturn(hashMap4).when(internalTopicManager)).getTopicPartitionInfo(hashSet2);
            Cluster cluster = new Cluster("cluster", new HashSet(randomNodes), hashSet, Collections.emptySet(), Collections.emptySet());
            TreeMap treeMap2 = new TreeMap();
            HashMap hashMap5 = new HashMap();
            for (int i11 = 0; i11 < i3; i11++) {
                ProcessId processIdForInt = AssignmentTestUtils.processIdForInt(i11);
                treeMap2.put(processIdForInt, emptyInstance(processIdForInt, treeMap));
                hashMap5.put(processIdForInt, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("consumer", Optional.of("rack" + TaskAssignorConvergenceTest.random.nextInt(randomNodes.size())))}));
            }
            return new Harness(treeSet, treeMap, treeMap2, cluster, hashMap, hashMap2, hashMap3, hashMap5, internalTopicManager);
        }

        private Harness(Set<TaskId> set, Map<TaskId, Long> map, Map<ProcessId, ClientState> map2, Cluster cluster, Map<TaskId, Set<TopicPartition>> map3, Map<TaskId, Set<TopicPartition>> map4, Map<TopologyMetadata.Subtopology, Set<TaskId>> map5, Map<ProcessId, Map<String, Optional<String>>> map6, InternalTopicManager internalTopicManager) {
            this.statelessTasks = set;
            this.statefulTaskEndOffsetSums = map;
            this.clientStates = map2;
            this.fullMetadata = cluster;
            this.partitionsForTask = map3;
            this.changelogPartitionsForTask = map4;
            this.tasksForTopicGroup = map5;
            this.racksForProcessConsumer = map6;
            this.internalTopicManager = internalTopicManager;
            this.history.append('\n');
            this.history.append("Cluster and application initial state: \n");
            this.history.append("Stateless tasks: ").append(set).append('\n');
            this.history.append("Stateful tasks:  ").append(map.keySet()).append('\n');
            this.history.append("Full metadata:  ").append(cluster).append('\n');
            this.history.append("Partitions for tasks:  ").append(map3).append('\n');
            this.history.append("Changelog partitions for tasks:  ").append(map4).append('\n');
            this.history.append("Tasks for subtopology:  ").append(map5).append('\n');
            this.history.append("Racks for process consumer:  ").append(map6).append('\n');
            formatClientStates(true);
            this.history.append("History of the cluster: \n");
        }

        private void addClient() {
            ProcessId processIdForInt = AssignmentTestUtils.processIdForInt(this.clientStates.size() + this.droppedClientStates.size());
            this.history.append("Adding new node ").append(processIdForInt).append('\n');
            this.clientStates.put(processIdForInt, emptyInstance(processIdForInt, this.statefulTaskEndOffsetSums));
            this.racksForProcessConsumer.computeIfAbsent(processIdForInt, processId -> {
                return new HashMap();
            }).put("consumer", Optional.of("rack" + TaskAssignorConvergenceTest.random.nextInt(this.fullMetadata.nodes().size())));
        }

        private static ClientState emptyInstance(ProcessId processId, Map<TaskId, Long> map) {
            ClientState clientState = new ClientState(processId, 1);
            clientState.computeTaskLags(processId, map);
            return clientState;
        }

        private void addOrResurrectClientsRandomly(Random random, int i) {
            int nextInt = random.nextInt(i);
            for (int i2 = 0; i2 < nextInt; i2++) {
                if (random.nextBoolean() || this.droppedClientStates.isEmpty()) {
                    addClient();
                } else {
                    ProcessId selectRandomElement = selectRandomElement(random, this.droppedClientStates);
                    this.history.append("Resurrecting node ").append(selectRandomElement).append('\n');
                    this.clientStates.put(selectRandomElement, this.droppedClientStates.get(selectRandomElement));
                    this.droppedClientStates.remove(selectRandomElement);
                }
            }
        }

        private void dropClient() {
            if (this.clientStates.isEmpty()) {
                throw new NoSuchElementException("There are no nodes to drop");
            }
            dropClient(this.clientStates.keySet().iterator().next());
        }

        private void dropRandomClients(int i, Random random) {
            for (int i2 = 0; !this.clientStates.isEmpty() && i2 < i; i2++) {
                dropClient(selectRandomElement(random, this.clientStates));
            }
            this.history.append("Stateless tasks: ").append(this.statelessTasks).append('\n');
            this.history.append("Stateful tasks:  ").append(this.statefulTaskEndOffsetSums.keySet()).append('\n');
            formatClientStates(true);
        }

        private void dropClient(ProcessId processId) {
            ClientState remove = this.clientStates.remove(processId);
            this.history.append("Dropping node ").append(processId).append(": ").append(remove).append('\n');
            this.droppedClientStates.put(processId, remove);
        }

        private static ProcessId selectRandomElement(Random random, Map<ProcessId, ClientState> map) {
            int nextInt = random.nextInt(map.size());
            ProcessId processId = null;
            Iterator<ProcessId> it = map.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ProcessId next = it.next();
                if (nextInt == 0) {
                    processId = next;
                    break;
                }
                nextInt--;
            }
            return processId;
        }

        private void prepareForNextRebalance() {
            TreeMap treeMap = new TreeMap();
            for (Map.Entry<ProcessId, ClientState> entry : this.clientStates.entrySet()) {
                ProcessId key = entry.getKey();
                ClientState clientState = new ClientState(key, 1);
                ClientState value = entry.getValue();
                TreeMap treeMap2 = new TreeMap();
                for (TaskId taskId : value.activeTasks()) {
                    if (this.statefulTaskEndOffsetSums.containsKey(taskId)) {
                        treeMap2.put(taskId, this.statefulTaskEndOffsetSums.get(taskId));
                    }
                }
                for (TaskId taskId2 : value.standbyTasks()) {
                    if (this.statefulTaskEndOffsetSums.containsKey(taskId2)) {
                        treeMap2.put(taskId2, this.statefulTaskEndOffsetSums.get(taskId2));
                    }
                }
                clientState.addPreviousActiveTasks(value.activeTasks());
                clientState.addPreviousStandbyTasks(value.standbyTasks());
                clientState.addPreviousTasksAndOffsetSums("consumer", treeMap2);
                clientState.computeTaskLags(key, this.statefulTaskEndOffsetSums);
                treeMap.put(key, clientState);
            }
            this.clientStates.clear();
            this.clientStates.putAll(treeMap);
        }

        private void recordConfig(AssignmentConfigs assignmentConfigs) {
            this.history.append("Creating assignor with configuration: ").append(assignmentConfigs).append('\n');
        }

        private void recordBefore(int i) {
            this.history.append("Starting Iteration: ").append(i).append('\n');
            formatClientStates(false);
        }

        private void recordAfter(int i, boolean z) {
            this.history.append("After assignment:  ").append(i).append('\n');
            this.history.append("Rebalance pending: ").append(z).append('\n');
            formatClientStates(true);
            this.history.append('\n');
        }

        private void formatClientStates(boolean z) {
            AssignmentTestUtils.appendClientStates(this.history, this.clientStates);
            if (z) {
                TreeSet treeSet = new TreeSet();
                treeSet.addAll(this.statefulTaskEndOffsetSums.keySet());
                treeSet.addAll(this.statelessTasks);
                Iterator<Map.Entry<ProcessId, ClientState>> it = this.clientStates.entrySet().iterator();
                while (it.hasNext()) {
                    treeSet.removeAll(it.next().getValue().assignedTasks());
                }
                this.history.append("Unassigned Tasks: ").append(treeSet).append('\n');
            }
        }
    }

    @BeforeAll
    public static void beforeClass() {
        long currentTimeMillis = System.currentTimeMillis();
        System.out.println("Seed is " + currentTimeMillis);
        random = new Random(currentTimeMillis);
    }

    public void setUp(String str) {
        if (str.equals("balance_subtopology")) {
            this.skewThreshold = 4;
        }
    }

    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void staticAssignmentShouldConvergeWithTheFirstAssignment(String str) {
        setUp(str);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(100L, 2, 0, 60000L, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        Harness initializeCluster = Harness.initializeCluster(1, 1, 1, () -> {
            return 1;
        }, 1);
        testForConvergence(initializeCluster, assignmentConfigs, 1);
        verifyValidAssignment(0, initializeCluster);
        verifyBalancedAssignment(initializeCluster, this.skewThreshold);
    }

    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void assignmentShouldConvergeAfterAddingNode(String str) {
        setUp(str);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(100L, 2, 0, 60000L, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        Harness initializeCluster = Harness.initializeCluster(7, 11, 1, () -> {
            return 5;
        }, 10);
        testForConvergence(initializeCluster, assignmentConfigs, 1);
        initializeCluster.addClient();
        testForConvergence(initializeCluster, assignmentConfigs, 6);
        verifyValidAssignment(0, initializeCluster);
        if (str.equals("min_traffic")) {
            return;
        }
        verifyBalancedAssignment(initializeCluster, this.skewThreshold);
    }

    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void droppingNodesShouldConverge(String str) {
        setUp(str);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(100L, 2, 0, 60000L, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        Harness initializeCluster = Harness.initializeCluster(11, 13, 7, () -> {
            return 5;
        }, 10);
        testForConvergence(initializeCluster, assignmentConfigs, 1);
        initializeCluster.dropClient();
        testForConvergence(initializeCluster, assignmentConfigs, 8);
        verifyValidAssignment(0, initializeCluster);
        if (str.equals("min_traffic")) {
            return;
        }
        verifyBalancedAssignment(initializeCluster, this.skewThreshold);
    }

    @ValueSource(strings = {"none", "min_traffic", "balance_subtopology"})
    @ParameterizedTest
    public void randomClusterPerturbationsShouldConverge(String str) {
        setUp(str);
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        do {
            runRandomizedScenario(new Random().nextLong(), str);
        } while (System.currentTimeMillis() < currentTimeMillis);
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:10:0x00b8. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:14:0x010d A[Catch: AssertionError -> 0x0140, Throwable -> 0x0152, TryCatch #2 {AssertionError -> 0x0140, Throwable -> 0x0152, blocks: (B:3:0x0003, B:5:0x009b, B:9:0x00ae, B:10:0x00b8, B:11:0x00d4, B:12:0x0100, B:14:0x010d, B:16:0x012e, B:18:0x0137, B:21:0x00e5, B:23:0x00f1, B:24:0x00ff), top: B:2:0x0003 }] */
    /* JADX WARN: Removed duplicated region for block: B:20:0x0137 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void runRandomizedScenario(long r14, java.lang.String r16) {
        /*
            Method dump skipped, instructions count: 395
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(long, java.lang.String):void");
    }

    private static void verifyBalancedAssignment(Harness harness, int i) {
        Set<TaskId> keySet = harness.statefulTaskEndOffsetSums.keySet();
        Map<ProcessId, ClientState> map = harness.clientStates;
        StringBuilder sb = harness.history;
        AssignmentTestUtils.assertBalancedActiveAssignment(map, sb);
        AssignmentTestUtils.assertBalancedStatefulAssignment(keySet, map, sb);
        AssignmentTestUtils.TaskSkewReport analyzeTaskAssignmentBalance = AssignmentTestUtils.analyzeTaskAssignmentBalance(harness.clientStates, i);
        if (analyzeTaskAssignmentBalance.totalSkewedTasks() > 0) {
            Assertions.fail("Expected a balanced task assignment, but was: " + String.valueOf(analyzeTaskAssignmentBalance) + "\n" + String.valueOf(sb));
        }
    }

    private static void verifyValidAssignment(int i, Harness harness) {
        AssignmentTestUtils.assertValidAssignment(i, harness.statefulTaskEndOffsetSums.keySet(), harness.statelessTasks, harness.clientStates, harness.history);
    }

    private static void testForConvergence(Harness harness, AssignmentConfigs assignmentConfigs, int i) {
        TreeSet treeSet = new TreeSet();
        treeSet.addAll(harness.statelessTasks);
        treeSet.addAll(harness.statefulTaskEndOffsetSums.keySet());
        harness.recordConfig(assignmentConfigs);
        boolean z = true;
        int i2 = 0;
        RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(harness.fullMetadata, harness.partitionsForTask, harness.changelogPartitionsForTask, harness.tasksForTopicGroup, harness.racksForProcessConsumer, harness.internalTopicManager, assignmentConfigs, TIME);
        while (z && i2 < i) {
            i2++;
            harness.prepareForNextRebalance();
            harness.recordBefore(i2);
            z = new HighAvailabilityTaskAssignor().assign(harness.clientStates, treeSet, harness.statefulTaskEndOffsetSums.keySet(), rackAwareTaskAssignor, assignmentConfigs);
            harness.recordAfter(i2, z);
        }
        if (z) {
            Assertions.fail("Rebalances have not converged after iteration cutoff: " + i + String.valueOf(harness.history));
        }
    }
}
