package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.class */
public class StreamsAssignmentScaleTest {
    static final long MAX_ASSIGNMENT_DURATION = 120000;
    static final String APPLICATION_ID = "streams-assignment-scale-test";
    private static final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);

    @Timeout(300)
    @Test
    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
        completeLargeAssignment(6000, 2, 1, 1, HighAvailabilityTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
        completeLargeAssignment(1000, 1000, 1, 1, HighAvailabilityTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testHighAvailabilityTaskAssignorManyStandbys() {
        completeLargeAssignment(1000, 100, 1, 50, HighAvailabilityTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
        completeLargeAssignment(1000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testStickyTaskAssignorLargePartitionCount() {
        completeLargeAssignment(2000, 2, 1, 1, LegacyStickyTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testStickyTaskAssignorLargeNumConsumers() {
        completeLargeAssignment(1000, 1000, 1, 1, LegacyStickyTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testStickyTaskAssignorManyStandbys() {
        completeLargeAssignment(1000, 100, 1, 20, LegacyStickyTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testStickyTaskAssignorManyThreadsPerClient() {
        completeLargeAssignment(1000, 10, 1000, 1, LegacyStickyTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testFallbackPriorTaskAssignorLargePartitionCount() {
        completeLargeAssignment(2000, 2, 1, 1, FallbackPriorTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testFallbackPriorTaskAssignorLargeNumConsumers() {
        completeLargeAssignment(1000, 1000, 1, 1, FallbackPriorTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testFallbackPriorTaskAssignorManyStandbys() {
        completeLargeAssignment(1000, 100, 1, 20, FallbackPriorTaskAssignor.class);
    }

    @Timeout(300)
    @Test
    public void testFallbackPriorTaskAssignorManyThreadsPerClient() {
        completeLargeAssignment(1000, 10, 1000, 1, FallbackPriorTaskAssignor.class);
    }

    private void completeLargeAssignment(int i, int i2, int i3, int i4, Class<? extends LegacyTaskAssignor> cls) {
        List singletonList = Collections.singletonList(AssignmentTestUtils.TOPIC_PREFIX);
        HashMap hashMap = new HashMap();
        for (int i5 = 0; i5 < i; i5++) {
            hashMap.put(new TopicPartition("streams-assignment-scale-test-store-changelog", i5), 100000L);
        }
        ArrayList arrayList = new ArrayList();
        for (int i6 = 0; i6 < i; i6++) {
            arrayList.add(new PartitionInfo(AssignmentTestUtils.TOPIC_PREFIX, i6, Node.noNode(), new Node[0], new Node[0]));
        }
        Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), arrayList, Collections.emptySet(), Collections.emptySet());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("application.id", APPLICATION_ID);
        hashMap2.put("bootstrap.servers", "localhost:8080");
        InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
        internalTopologyBuilder.addSource((AutoOffsetResetInternal) null, "source", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, new String[]{AssignmentTestUtils.TOPIC_PREFIX});
        internalTopologyBuilder.addProcessor("processor", new MockApiProcessorSupplier(), new String[]{"source"});
        internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("store", false), new String[]{"processor"});
        TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(hashMap2));
        topologyMetadata.buildAndRewriteTopology();
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
        Mockito.when(taskManager.topologyMetadata()).thenReturn(topologyMetadata);
        AdminClient createMockAdminClientForAssignor = AssignmentTestUtils.createMockAdminClientForAssignor(hashMap, true);
        ReferenceContainer referenceContainer = new ReferenceContainer();
        referenceContainer.mainConsumer = consumer;
        referenceContainer.adminClient = createMockAdminClientForAssignor;
        referenceContainer.taskManager = taskManager;
        referenceContainer.streamsMetadataState = (StreamsMetadataState) Mockito.mock(StreamsMetadataState.class);
        referenceContainer.time = new MockTime();
        hashMap2.put("__reference.container.instance__", referenceContainer);
        hashMap2.put("internal.task.assignor.class", cls.getName());
        hashMap2.put("num.standby.replicas", Integer.valueOf(i4));
        hashMap2.put("rack.aware.assignment.strategy", "none");
        MockInternalTopicManager mockInternalTopicManager = (MockInternalTopicManager) Mockito.spy(new MockInternalTopicManager(new MockTime(), new StreamsConfig(hashMap2), new MockClientSupplier().restoreConsumer, false));
        Mockito.lenient().when(mockInternalTopicManager.getTopicPartitionInfo(ArgumentMatchers.anySet())).thenAnswer(invocationOnMock -> {
            HashMap hashMap3 = new HashMap();
            for (String str : (Set) invocationOnMock.getArgument(0)) {
                ArrayList arrayList2 = new ArrayList();
                arrayList.forEach(partitionInfo -> {
                    arrayList2.add(new TopicPartitionInfo(partitionInfo.partition(), partitionInfo.leader(), Collections.singletonList(new Node(1, "host1", 80, "rack-1")), Arrays.asList(partitionInfo.inSyncReplicas())));
                });
                hashMap3.put(str, arrayList2);
            }
            return hashMap3;
        });
        StreamsPartitionAssignor streamsPartitionAssignor = new StreamsPartitionAssignor();
        streamsPartitionAssignor.configure(hashMap2);
        streamsPartitionAssignor.setInternalTopicManager(mockInternalTopicManager);
        HashMap hashMap3 = new HashMap();
        for (int i7 = 0; i7 < i2; i7++) {
            for (int i8 = 0; i8 < i3; i8++) {
                hashMap3.put(getConsumerName(i8, i7), new ConsumerPartitionAssignor.Subscription(singletonList, AssignmentTestUtils.getInfo(AssignmentTestUtils.processIdForInt(i7), AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList(), -1, Optional.of(String.format("rack-%d", Integer.valueOf(i7 % 31)))));
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        Map groupAssignment = streamsPartitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(hashMap3)).groupAssignment();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 > MAX_ASSIGNMENT_DURATION) {
            throw new AssertionError("The first assignment took too long to complete at " + currentTimeMillis2 + "ms.");
        }
        log.info("First assignment took {}ms.", Long.valueOf(currentTimeMillis2));
        for (int i9 = 0; i9 < i2; i9++) {
            for (int i10 = 0; i10 < i3; i10++) {
                String consumerName = getConsumerName(i10, i9);
                ConsumerPartitionAssignor.Assignment assignment = (ConsumerPartitionAssignor.Assignment) groupAssignment.get(consumerName);
                AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
                hashMap3.put(consumerName, new ConsumerPartitionAssignor.Subscription(singletonList, AssignmentTestUtils.getInfo(AssignmentTestUtils.processIdForInt(i9), new HashSet(decode.activeTasks()), decode.standbyTasks().keySet()).encode(), assignment.partitions()));
            }
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        Map groupAssignment2 = streamsPartitionAssignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(hashMap3)).groupAssignment();
        long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
        if (currentTimeMillis4 > MAX_ASSIGNMENT_DURATION) {
            throw new AssertionError("The second assignment took too long to complete at " + currentTimeMillis4 + "ms.");
        }
        log.info("Second assignment took {}ms.", Long.valueOf(currentTimeMillis4));
        MatcherAssert.assertThat(Integer.valueOf(groupAssignment2.size()), CoreMatchers.is(Integer.valueOf(i2 * i3)));
    }

    private String getConsumerName(int i, int i2) {
        return "consumer-" + i2 + "-" + i;
    }
}
