package org.apache.kafka.connect.runtime.distributed;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
import org.powermock.reflect.Whitebox;

/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.class */
public class WorkerCoordinatorTest {
    private static final String LEADER_URL = "leaderUrl:8083";
    private static final String MEMBER_URL = "memberUrl:8083";
    private MockTime time;
    private MockClient client;
    private Metadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;

    @Mock
    private KafkaConfigBackingStore configStorage;
    private WorkerCoordinator coordinator;
    private ClusterConfigState configState1;
    private ClusterConfigState configState2;
    private ClusterConfigState configStateSingleTaskConnectors;
    private String connectorId1 = "connector1";
    private String connectorId2 = "connector2";
    private String connectorId3 = "connector3";
    private ConnectorTaskId taskId1x0 = new ConnectorTaskId(this.connectorId1, 0);
    private ConnectorTaskId taskId1x1 = new ConnectorTaskId(this.connectorId1, 1);
    private ConnectorTaskId taskId2x0 = new ConnectorTaskId(this.connectorId2, 0);
    private ConnectorTaskId taskId3x0 = new ConnectorTaskId(this.connectorId3, 0);
    private String groupId = "test-group";
    private int sessionTimeoutMs = 10;
    private int rebalanceTimeoutMs = 60;
    private int heartbeatIntervalMs = 2;
    private long retryBackoffMs = 100;
    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
    private Node node = (Node) this.cluster.nodes().get(0);

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest$MockRebalanceListener.class */
    private static class MockRebalanceListener implements WorkerRebalanceListener {
        public ConnectProtocol.Assignment assignment;
        public String revokedLeader;
        public Collection<String> revokedConnectors;
        public Collection<ConnectorTaskId> revokedTasks;
        public int revokedCount;
        public int assignedCount;

        private MockRebalanceListener() {
            this.assignment = null;
            this.revokedCount = 0;
            this.assignedCount = 0;
        }

        public void onAssigned(ConnectProtocol.Assignment assignment, int i) {
            this.assignment = assignment;
            this.assignedCount++;
        }

        public void onRevoked(String str, Collection<String> collection, Collection<ConnectorTaskId> collection2) {
            this.revokedLeader = str;
            this.revokedConnectors = collection;
            this.revokedTasks = collection2;
            this.revokedCount++;
        }
    }

    @Before
    public void setup() {
        LogContext logContext = new LogContext();
        this.time = new MockTime();
        this.client = new MockClient(this.time);
        this.metadata = new Metadata(0L, Long.MAX_VALUE, true);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.consumerClient = new ConsumerNetworkClient(logContext, this.client, this.metadata, this.time, 100L, 1000, this.heartbeatIntervalMs);
        this.metrics = new Metrics(this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.configStorage = (KafkaConfigBackingStore) PowerMock.createMock(KafkaConfigBackingStore.class);
        this.client.setNode(this.node);
        this.coordinator = new WorkerCoordinator(logContext, this.consumerClient, this.groupId, this.rebalanceTimeoutMs, this.sessionTimeoutMs, this.heartbeatIntervalMs, this.metrics, "consumer" + this.groupId, this.time, this.retryBackoffMs, LEADER_URL, this.configStorage, this.rebalanceListener);
        this.configState1 = new ClusterConfigState(1L, Collections.singletonMap(this.connectorId1, 1), Collections.singletonMap(this.connectorId1, new HashMap()), Collections.singletonMap(this.connectorId1, TargetState.STARTED), Collections.singletonMap(this.taskId1x0, new HashMap()), Collections.emptySet());
        HashMap hashMap = new HashMap();
        hashMap.put(this.connectorId1, 2);
        hashMap.put(this.connectorId2, 1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.connectorId1, new HashMap());
        hashMap2.put(this.connectorId2, new HashMap());
        HashMap hashMap3 = new HashMap();
        hashMap3.put(this.connectorId1, TargetState.STARTED);
        hashMap3.put(this.connectorId2, TargetState.STARTED);
        HashMap hashMap4 = new HashMap();
        hashMap4.put(this.taskId1x0, new HashMap());
        hashMap4.put(this.taskId1x1, new HashMap());
        hashMap4.put(this.taskId2x0, new HashMap());
        this.configState2 = new ClusterConfigState(2L, hashMap, hashMap2, hashMap3, hashMap4, Collections.emptySet());
        HashMap hashMap5 = new HashMap();
        hashMap5.put(this.connectorId1, 1);
        hashMap5.put(this.connectorId2, 1);
        hashMap5.put(this.connectorId3, 1);
        HashMap hashMap6 = new HashMap();
        hashMap6.put(this.connectorId1, new HashMap());
        hashMap6.put(this.connectorId2, new HashMap());
        hashMap6.put(this.connectorId3, new HashMap());
        HashMap hashMap7 = new HashMap();
        hashMap7.put(this.connectorId1, TargetState.STARTED);
        hashMap7.put(this.connectorId2, TargetState.STARTED);
        hashMap7.put(this.connectorId3, TargetState.STARTED);
        HashMap hashMap8 = new HashMap();
        hashMap8.put(this.taskId1x0, new HashMap());
        hashMap8.put(this.taskId2x0, new HashMap());
        hashMap8.put(this.taskId3x0, new HashMap());
        this.configStateSingleTaskConnectors = new ClusterConfigState(2L, hashMap5, hashMap6, hashMap7, hashMap8, Collections.emptySet());
    }

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testMetadata() {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        PowerMock.replayAll(new Object[0]);
        List metadata = this.coordinator.metadata();
        Assert.assertEquals(1L, metadata.size());
        JoinGroupRequest.ProtocolMetadata protocolMetadata = (JoinGroupRequest.ProtocolMetadata) metadata.get(0);
        Assert.assertEquals("default", protocolMetadata.name());
        Assert.assertEquals(1L, ConnectProtocol.deserializeMetadata(protocolMetadata.metadata()).offset());
        PowerMock.verifyAll();
    }

    @Test
    public void testNormalJoinGroupLeader() {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        PowerMock.replayAll(new Object[0]);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        HashMap hashMap = new HashMap();
        hashMap.put("leader", 1L);
        hashMap.put("member", 1L);
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", hashMap, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest.1
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("leader") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().containsKey("leader");
            }
        }, syncGroupResponse((short) 0, "leader", 1L, Collections.singletonList(this.connectorId1), Collections.emptyList(), Errors.NONE));
        this.coordinator.ensureActiveGroup();
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(0L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertFalse(this.rebalanceListener.assignment.failed());
        Assert.assertEquals(1L, this.rebalanceListener.assignment.offset());
        Assert.assertEquals("leader", this.rebalanceListener.assignment.leader());
        Assert.assertEquals(Collections.singletonList(this.connectorId1), this.rebalanceListener.assignment.connectors());
        Assert.assertEquals(Collections.emptyList(), this.rebalanceListener.assignment.tasks());
        PowerMock.verifyAll();
    }

    @Test
    public void testNormalJoinGroupFollower() {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        PowerMock.replayAll(new Object[0]);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest.2
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("member") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().isEmpty();
            }
        }, syncGroupResponse((short) 0, "leader", 1L, Collections.emptyList(), Collections.singletonList(this.taskId1x0), Errors.NONE));
        this.coordinator.ensureActiveGroup();
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(0L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertFalse(this.rebalanceListener.assignment.failed());
        Assert.assertEquals(1L, this.rebalanceListener.assignment.offset());
        Assert.assertEquals(Collections.emptyList(), this.rebalanceListener.assignment.connectors());
        Assert.assertEquals(Collections.singletonList(this.taskId1x0), this.rebalanceListener.assignment.tasks());
        PowerMock.verifyAll();
    }

    @Test
    public void testJoinLeaderCannotAssign() {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState2);
        PowerMock.replayAll(new Object[0]);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
        MockClient.RequestMatcher requestMatcher = new MockClient.RequestMatcher() { // from class: org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest.3
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("member") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().isEmpty();
            }
        };
        this.client.prepareResponse(requestMatcher, syncGroupResponse((short) 1, "leader", 10L, Collections.emptyList(), Collections.emptyList(), Errors.NONE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
        this.client.prepareResponse(requestMatcher, syncGroupResponse((short) 0, "leader", 1L, Collections.emptyList(), Collections.singletonList(this.taskId1x0), Errors.NONE));
        this.coordinator.ensureActiveGroup();
        PowerMock.verifyAll();
    }

    @Test
    public void testRejoinGroup() {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        PowerMock.replayAll(new Object[0]);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse((short) 0, "leader", 1L, Collections.emptyList(), Collections.singletonList(this.taskId1x0), Errors.NONE));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals(0L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertFalse(this.rebalanceListener.assignment.failed());
        Assert.assertEquals(1L, this.rebalanceListener.assignment.offset());
        Assert.assertEquals(Collections.emptyList(), this.rebalanceListener.assignment.connectors());
        Assert.assertEquals(Collections.singletonList(this.taskId1x0), this.rebalanceListener.assignment.tasks());
        this.coordinator.requestRejoin();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse((short) 0, "leader", 1L, Collections.singletonList(this.connectorId1), Collections.emptyList(), Errors.NONE));
        this.coordinator.ensureActiveGroup();
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptyList(), this.rebalanceListener.revokedConnectors);
        Assert.assertEquals(Collections.singletonList(this.taskId1x0), this.rebalanceListener.revokedTasks);
        Assert.assertEquals(2L, this.rebalanceListener.assignedCount);
        Assert.assertFalse(this.rebalanceListener.assignment.failed());
        Assert.assertEquals(1L, this.rebalanceListener.assignment.offset());
        Assert.assertEquals(Collections.singletonList(this.connectorId1), this.rebalanceListener.assignment.connectors());
        Assert.assertEquals(Collections.emptyList(), this.rebalanceListener.assignment.tasks());
        PowerMock.verifyAll();
    }

    @Test
    public void testLeaderPerformAssignment1() throws Exception {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState1);
        PowerMock.replayAll(new Object[0]);
        this.coordinator.metadata();
        HashMap hashMap = new HashMap();
        hashMap.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
        hashMap.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
        Map map = (Map) Whitebox.invokeMethod(this.coordinator, "performAssignment", new Object[]{"leader", "default", hashMap});
        ConnectProtocol.Assignment deserializeAssignment = ConnectProtocol.deserializeAssignment((ByteBuffer) map.get("leader"));
        Assert.assertEquals(false, Boolean.valueOf(deserializeAssignment.failed()));
        Assert.assertEquals("leader", deserializeAssignment.leader());
        Assert.assertEquals(1L, deserializeAssignment.offset());
        Assert.assertEquals(Collections.singletonList(this.connectorId1), deserializeAssignment.connectors());
        Assert.assertEquals(Collections.emptyList(), deserializeAssignment.tasks());
        ConnectProtocol.Assignment deserializeAssignment2 = ConnectProtocol.deserializeAssignment((ByteBuffer) map.get("member"));
        Assert.assertEquals(false, Boolean.valueOf(deserializeAssignment2.failed()));
        Assert.assertEquals("leader", deserializeAssignment2.leader());
        Assert.assertEquals(1L, deserializeAssignment2.offset());
        Assert.assertEquals(Collections.emptyList(), deserializeAssignment2.connectors());
        Assert.assertEquals(Collections.singletonList(this.taskId1x0), deserializeAssignment2.tasks());
        PowerMock.verifyAll();
    }

    @Test
    public void testLeaderPerformAssignment2() throws Exception {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configState2);
        PowerMock.replayAll(new Object[0]);
        this.coordinator.metadata();
        HashMap hashMap = new HashMap();
        hashMap.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
        hashMap.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
        Map map = (Map) Whitebox.invokeMethod(this.coordinator, "performAssignment", new Object[]{"leader", "default", hashMap});
        ConnectProtocol.Assignment deserializeAssignment = ConnectProtocol.deserializeAssignment((ByteBuffer) map.get("leader"));
        Assert.assertEquals(false, Boolean.valueOf(deserializeAssignment.failed()));
        Assert.assertEquals("leader", deserializeAssignment.leader());
        Assert.assertEquals(1L, deserializeAssignment.offset());
        Assert.assertEquals(Collections.singletonList(this.connectorId1), deserializeAssignment.connectors());
        Assert.assertEquals(Arrays.asList(this.taskId1x0, this.taskId2x0), deserializeAssignment.tasks());
        ConnectProtocol.Assignment deserializeAssignment2 = ConnectProtocol.deserializeAssignment((ByteBuffer) map.get("member"));
        Assert.assertEquals(false, Boolean.valueOf(deserializeAssignment2.failed()));
        Assert.assertEquals("leader", deserializeAssignment2.leader());
        Assert.assertEquals(1L, deserializeAssignment2.offset());
        Assert.assertEquals(Collections.singletonList(this.connectorId2), deserializeAssignment2.connectors());
        Assert.assertEquals(Collections.singletonList(this.taskId1x1), deserializeAssignment2.tasks());
        PowerMock.verifyAll();
    }

    @Test
    public void testLeaderPerformAssignmentSingleTaskConnectors() throws Exception {
        EasyMock.expect(this.configStorage.snapshot()).andReturn(this.configStateSingleTaskConnectors);
        PowerMock.replayAll(new Object[0]);
        this.coordinator.metadata();
        HashMap hashMap = new HashMap();
        hashMap.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
        hashMap.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
        Map map = (Map) Whitebox.invokeMethod(this.coordinator, "performAssignment", new Object[]{"leader", "default", hashMap});
        ConnectProtocol.Assignment deserializeAssignment = ConnectProtocol.deserializeAssignment((ByteBuffer) map.get("leader"));
        Assert.assertEquals(false, Boolean.valueOf(deserializeAssignment.failed()));
        Assert.assertEquals("leader", deserializeAssignment.leader());
        Assert.assertEquals(1L, deserializeAssignment.offset());
        Assert.assertEquals(Arrays.asList(this.connectorId1, this.connectorId3), deserializeAssignment.connectors());
        Assert.assertEquals(Arrays.asList(this.taskId2x0), deserializeAssignment.tasks());
        ConnectProtocol.Assignment deserializeAssignment2 = ConnectProtocol.deserializeAssignment((ByteBuffer) map.get("member"));
        Assert.assertEquals(false, Boolean.valueOf(deserializeAssignment2.failed()));
        Assert.assertEquals("leader", deserializeAssignment2.leader());
        Assert.assertEquals(1L, deserializeAssignment2.offset());
        Assert.assertEquals(Collections.singletonList(this.connectorId2), deserializeAssignment2.connectors());
        Assert.assertEquals(Arrays.asList(this.taskId1x0, this.taskId3x0), deserializeAssignment2.tasks());
        PowerMock.verifyAll();
    }

    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors errors) {
        return new FindCoordinatorResponse(errors, node);
    }

    private JoinGroupResponse joinGroupLeaderResponse(int i, String str, Map<String, Long> map, Errors errors) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(entry.getKey(), entry.getValue().longValue())));
        }
        return new JoinGroupResponse(errors, i, "default", str, str, hashMap);
    }

    private JoinGroupResponse joinGroupFollowerResponse(int i, String str, String str2, Errors errors) {
        return new JoinGroupResponse(errors, i, "default", str, str2, Collections.emptyMap());
    }

    private SyncGroupResponse syncGroupResponse(short s, String str, long j, List<String> list, List<ConnectorTaskId> list2, Errors errors) {
        return new SyncGroupResponse(errors, ConnectProtocol.serializeAssignment(new ConnectProtocol.Assignment(s, str, LEADER_URL, j, list, list2)));
    }
}
