package org.apache.kafka.tools.consumer.group;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.chrono.ChronoZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import joptsimple.OptionException;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTemplate;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommand;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ClusterTestExtensions.class})
/* loaded from: input_file:org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.class */
public class ResetConsumerGroupOffsetTest {
    private static final String TOPIC_PREFIX = "foo-";
    private static final String GROUP_PREFIX = "test.group-";

    private static List<ClusterConfig> generator() {
        return ConsumerGroupCommandTestUtils.generator();
    }

    private String[] basicArgs(ClusterInstance clusterInstance) {
        return new String[]{"--reset-offsets", "--bootstrap-server", clusterInstance.bootstrapServers(), "--timeout", Long.toString(15000L)};
    }

    private String[] buildArgsForGroups(ClusterInstance clusterInstance, List<String> list, String... strArr) {
        ArrayList arrayList = new ArrayList(Arrays.asList(basicArgs(clusterInstance)));
        for (String str : list) {
            arrayList.add("--group");
            arrayList.add(str);
        }
        arrayList.addAll(Arrays.asList(strArr));
        return (String[]) arrayList.toArray(new String[0]);
    }

    private String[] buildArgsForGroup(ClusterInstance clusterInstance, String str, String... strArr) {
        return buildArgsForGroups(clusterInstance, Collections.singletonList(str), strArr);
    }

    private String[] buildArgsForAllGroups(ClusterInstance clusterInstance, String... strArr) {
        ArrayList arrayList = new ArrayList(Arrays.asList(basicArgs(clusterInstance)));
        arrayList.add("--all-groups");
        arrayList.addAll(Arrays.asList(strArr));
        return (String[]) arrayList.toArray(new String[0]);
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsNotExistingGroup(ClusterInstance clusterInstance) throws Exception {
        String generateRandomTopic = generateRandomTopic();
        String str = "missing.group";
        ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(buildArgsForGroup(clusterInstance, "missing.group", "--all-topics", "--to-current", "--execute"));
        Throwable th = null;
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    return "localhost".equals(consumerGroupService.collectGroupState(str).coordinator.host());
                }, "Can't find a coordinator");
                Assertions.assertTrue(((Map) consumerGroupService.resetOffsets().get("missing.group")).isEmpty());
                Assertions.assertTrue(committedOffsets(clusterInstance, generateRandomTopic, "missing.group").isEmpty());
                if (consumerGroupService != null) {
                    if (0 == 0) {
                        consumerGroupService.close();
                        return;
                    }
                    try {
                        consumerGroupService.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (consumerGroupService != null) {
                if (th != null) {
                    try {
                        consumerGroupService.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    consumerGroupService.close();
                }
            }
            throw th4;
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsExistingTopic(ClusterInstance clusterInstance) {
        String generateRandomTopic = generateRandomTopic();
        String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, "new.group", "--topic", generateRandomTopic, "--to-offset", "50");
        produceMessages(clusterInstance, generateRandomTopic, 100);
        resetAndAssertOffsets(clusterInstance, buildArgsForGroup, 50L, true, Collections.singletonList(generateRandomTopic));
        resetAndAssertOffsets(clusterInstance, addTo(buildArgsForGroup, "--dry-run"), 50L, true, Collections.singletonList(generateRandomTopic));
        resetAndAssertOffsets(clusterInstance, addTo(buildArgsForGroup, "--execute"), 50L, false, Collections.singletonList(generateRandomTopic));
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsExistingTopicSelectedGroups(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomTopic = generateRandomTopic();
            produceMessages(clusterInstance, generateRandomTopic, 100);
            List<String> generateIds = generateIds(generateRandomTopic);
            for (String str : generateIds) {
                AutoCloseable consumerGroupClosable = consumerGroupClosable(clusterInstance, 1, generateRandomTopic, str, groupProtocol);
                Throwable th = null;
                try {
                    try {
                        awaitConsumerProgress(clusterInstance, generateRandomTopic, str, 100L);
                        if (consumerGroupClosable != null) {
                            if (0 != 0) {
                                try {
                                    consumerGroupClosable.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                consumerGroupClosable.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (consumerGroupClosable != null) {
                        if (th != null) {
                            try {
                                consumerGroupClosable.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            consumerGroupClosable.close();
                        }
                    }
                    throw th3;
                }
            }
            String[] buildArgsForGroups = buildArgsForGroups(clusterInstance, generateIds, "--topic", generateRandomTopic, "--to-offset", "50");
            resetAndAssertOffsets(clusterInstance, buildArgsForGroups, 50L, true, Collections.singletonList(generateRandomTopic));
            resetAndAssertOffsets(clusterInstance, addTo(buildArgsForGroups, "--dry-run"), 50L, true, Collections.singletonList(generateRandomTopic));
            resetAndAssertOffsets(clusterInstance, addTo(buildArgsForGroups, "--execute"), 50L, false, Collections.singletonList(generateRandomTopic));
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsExistingTopicAllGroups(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForAllGroups = buildArgsForAllGroups(clusterInstance, "--topic", generateRandomTopic, "--to-offset", "50");
            produceMessages(clusterInstance, generateRandomTopic, 100);
            for (int i = 1; i <= 3; i++) {
                String generateRandomGroupId = generateRandomGroupId();
                AutoCloseable consumerGroupClosable = consumerGroupClosable(clusterInstance, 1, generateRandomTopic, generateRandomGroupId, groupProtocol);
                Throwable th = null;
                try {
                    try {
                        awaitConsumerProgress(clusterInstance, generateRandomTopic, generateRandomGroupId, 100L);
                        if (consumerGroupClosable != null) {
                            if (0 != 0) {
                                try {
                                    consumerGroupClosable.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                consumerGroupClosable.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (consumerGroupClosable != null) {
                        if (th != null) {
                            try {
                                consumerGroupClosable.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            consumerGroupClosable.close();
                        }
                    }
                    throw th3;
                }
            }
            resetAndAssertOffsets(clusterInstance, buildArgsForAllGroups, 50L, true, Collections.singletonList(generateRandomTopic));
            resetAndAssertOffsets(clusterInstance, addTo(buildArgsForAllGroups, "--dry-run"), 50L, true, Collections.singletonList(generateRandomTopic));
            resetAndAssertOffsets(clusterInstance, addTo(buildArgsForAllGroups, "--execute"), 50L, false, Collections.singletonList(generateRandomTopic));
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsAllTopicsAllGroups(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForAllGroups = buildArgsForAllGroups(clusterInstance, "--all-topics", "--to-offset", "50");
            List<String> generateIds = generateIds(generateRandomGroupId);
            List<String> generateIds2 = generateIds(generateRandomTopic);
            generateIds.forEach(str -> {
                produceMessages(clusterInstance, str, 100);
            });
            for (String str2 : generateIds) {
                for (String str3 : generateIds2) {
                    AutoCloseable consumerGroupClosable = consumerGroupClosable(clusterInstance, 3, str2, str3, groupProtocol);
                    Throwable th = null;
                    try {
                        try {
                            awaitConsumerProgress(clusterInstance, str2, str3, 100L);
                            if (consumerGroupClosable != null) {
                                if (0 != 0) {
                                    try {
                                        consumerGroupClosable.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    consumerGroupClosable.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (consumerGroupClosable != null) {
                            if (th != null) {
                                try {
                                    consumerGroupClosable.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                consumerGroupClosable.close();
                            }
                        }
                        throw th3;
                    }
                }
            }
            resetAndAssertOffsets(clusterInstance, buildArgsForAllGroups, 50L, true, generateIds);
            resetAndAssertOffsets(clusterInstance, addTo(buildArgsForAllGroups, "--dry-run"), 50L, true, generateIds);
            resetAndAssertOffsets(clusterInstance, addTo(buildArgsForAllGroups, "--execute"), 50L, false, generateIds);
            Admin createAdminClient = clusterInstance.createAdminClient();
            Throwable th5 = null;
            try {
                try {
                    createAdminClient.deleteConsumerGroups(generateIds2).all().get();
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th7) {
                if (createAdminClient != null) {
                    if (th5 != null) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th8) {
                            th5.addSuppressed(th8);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th7;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToLocalDateTime(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-datetime", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS").format(LocalDateTime.now().minusDays(1L)), "--execute");
            produceMessages(clusterInstance, generateRandomTopic, 100);
            AutoCloseable consumerGroupClosable = consumerGroupClosable(clusterInstance, 1, generateRandomTopic, generateRandomGroupId, groupProtocol);
            Throwable th = null;
            try {
                try {
                    awaitConsumerProgress(clusterInstance, generateRandomTopic, generateRandomGroupId, 100L);
                    if (consumerGroupClosable != null) {
                        if (0 != 0) {
                            try {
                                consumerGroupClosable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            consumerGroupClosable.close();
                        }
                    }
                    resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 0L);
                } finally {
                }
            } catch (Throwable th3) {
                if (consumerGroupClosable != null) {
                    if (th != null) {
                        try {
                            consumerGroupClosable.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        consumerGroupClosable.close();
                    }
                }
                throw th3;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToZonedDateTime(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
            produceMessages(clusterInstance, generateRandomTopic, 50);
            ChronoZonedDateTime<LocalDate> atZone = LocalDateTime.now().atZone(ZoneId.systemDefault());
            produceMessages(clusterInstance, generateRandomTopic, 50);
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-datetime", ofPattern.format(atZone), "--execute");
            AutoCloseable consumerGroupClosable = consumerGroupClosable(clusterInstance, 1, generateRandomTopic, generateRandomGroupId, groupProtocol);
            Throwable th = null;
            try {
                try {
                    awaitConsumerProgress(clusterInstance, generateRandomTopic, generateRandomGroupId, 100L);
                    if (consumerGroupClosable != null) {
                        if (0 != 0) {
                            try {
                                consumerGroupClosable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            consumerGroupClosable.close();
                        }
                    }
                    resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 50L);
                } finally {
                }
            } catch (Throwable th3) {
                if (consumerGroupClosable != null) {
                    if (th != null) {
                        try {
                            consumerGroupClosable.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        consumerGroupClosable.close();
                    }
                }
                throw th3;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsByDuration(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--by-duration", "PT1M", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 0L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsByDurationToEarliest(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--by-duration", "PT0.1S", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 100L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
        String generateRandomGroupId = generateRandomGroupId();
        String generateRandomTopic = generateRandomTopic();
        String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--topic", generateRandomTopic, "--by-duration", "PT1M", "--execute");
        Admin createAdminClient = clusterInstance.createAdminClient();
        Throwable th = null;
        try {
            try {
                createAdminClient.createTopics(Collections.singleton(new NewTopic(generateRandomTopic, 1, (short) 1))).all().get();
                resetAndAssertOffsets(clusterInstance, buildArgsForGroup, 0L, false, Collections.singletonList(generateRandomTopic));
                createAdminClient.deleteTopics(Collections.singleton(generateRandomTopic)).all().get();
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToEarliest(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-earliest", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 0L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToLatest(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-latest", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            produceMessages(clusterInstance, generateRandomTopic, 100);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 200L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToCurrentOffset(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-current", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            produceMessages(clusterInstance, generateRandomTopic, 100);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 100L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToSpecificOffset(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-offset", "1", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 1L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsShiftPlus(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--shift-by", "50", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            produceMessages(clusterInstance, generateRandomTopic, 100);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 150L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsShiftMinus(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--shift-by", "-50", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            produceMessages(clusterInstance, generateRandomTopic, 100);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 50L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsShiftByLowerThanEarliest(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--shift-by", "-150", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            produceMessages(clusterInstance, generateRandomTopic, 100);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 0L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsShiftByHigherThanLatest(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--shift-by", "150", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            produceMessages(clusterInstance, generateRandomTopic, 100);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 200L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToEarliestOnOneTopic(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--topic", generateRandomTopic, "--to-earliest", "--execute");
            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
            resetAndAssertOffsets(clusterInstance, generateRandomTopic, buildArgsForGroup, 0L);
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToEarliestOnOneTopicAndPartition(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--topic", generateRandomTopic + ":1", "--to-earliest", "--execute");
            Admin createAdminClient = clusterInstance.createAdminClient();
            Throwable th = null;
            try {
                ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(buildArgsForGroup);
                Throwable th2 = null;
                try {
                    try {
                        createAdminClient.createTopics(Collections.singleton(new NewTopic(generateRandomTopic, 2, (short) 1))).all().get();
                        produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 2, groupProtocol);
                        Map<TopicPartition, Long> committedOffsets = committedOffsets(clusterInstance, generateRandomTopic, generateRandomGroupId);
                        TopicPartition topicPartition = new TopicPartition(generateRandomTopic, 0);
                        TopicPartition topicPartition2 = new TopicPartition(generateRandomTopic, 1);
                        HashMap hashMap = new HashMap();
                        hashMap.put(topicPartition, committedOffsets.get(topicPartition));
                        hashMap.put(topicPartition2, 0L);
                        resetAndAssertOffsetsCommitted(clusterInstance, consumerGroupService, hashMap, generateRandomTopic);
                        createAdminClient.deleteTopics(Collections.singleton(generateRandomTopic)).all().get();
                        if (consumerGroupService != null) {
                            if (0 != 0) {
                                try {
                                    consumerGroupService.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                consumerGroupService.close();
                            }
                        }
                        if (createAdminClient != null) {
                            if (0 != 0) {
                                try {
                                    createAdminClient.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                createAdminClient.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th5) {
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th5;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToEarliestOnTopics(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String generateRandomTopic2 = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--topic", generateRandomTopic, "--topic", generateRandomTopic2, "--to-earliest", "--execute");
            Admin createAdminClient = clusterInstance.createAdminClient();
            Throwable th = null;
            try {
                ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(buildArgsForGroup);
                Throwable th2 = null;
                try {
                    try {
                        createAdminClient.createTopics(Arrays.asList(new NewTopic(generateRandomTopic, 1, (short) 1), new NewTopic(generateRandomTopic2, 1, (short) 1))).all().get();
                        produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
                        produceConsumeAndShutdown(clusterInstance, generateRandomTopic2, generateRandomGroupId, 1, groupProtocol);
                        TopicPartition topicPartition = new TopicPartition(generateRandomTopic, 0);
                        TopicPartition topicPartition2 = new TopicPartition(generateRandomTopic2, 0);
                        Map<TopicPartition, Long> offsetMap = toOffsetMap(resetOffsets(consumerGroupService).get(generateRandomGroupId));
                        HashMap hashMap = new HashMap();
                        hashMap.put(topicPartition, 0L);
                        hashMap.put(topicPartition2, 0L);
                        Assertions.assertEquals(hashMap, offsetMap);
                        Assertions.assertEquals(Collections.singletonMap(topicPartition, 0L), committedOffsets(clusterInstance, generateRandomTopic, generateRandomGroupId));
                        Assertions.assertEquals(Collections.singletonMap(topicPartition2, 0L), committedOffsets(clusterInstance, generateRandomTopic2, generateRandomGroupId));
                        createAdminClient.deleteTopics(Arrays.asList(generateRandomTopic, generateRandomTopic2)).all().get();
                        if (consumerGroupService != null) {
                            if (0 != 0) {
                                try {
                                    consumerGroupService.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                consumerGroupService.close();
                            }
                        }
                        if (createAdminClient != null) {
                            if (0 != 0) {
                                try {
                                    createAdminClient.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                createAdminClient.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (consumerGroupService != null) {
                        if (th2 != null) {
                            try {
                                consumerGroupService.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            consumerGroupService.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th7;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsToEarliestOnTopicsAndPartitions(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String generateRandomTopic2 = generateRandomTopic();
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--topic", generateRandomTopic + ":1", "--topic", generateRandomTopic2 + ":1", "--to-earliest", "--execute");
            Admin createAdminClient = clusterInstance.createAdminClient();
            Throwable th = null;
            try {
                try {
                    ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(buildArgsForGroup);
                    Throwable th2 = null;
                    try {
                        try {
                            createAdminClient.createTopics(Arrays.asList(new NewTopic(generateRandomTopic, 2, (short) 1), new NewTopic(generateRandomTopic2, 2, (short) 1))).all().get();
                            produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 2, groupProtocol);
                            produceConsumeAndShutdown(clusterInstance, generateRandomTopic2, generateRandomGroupId, 2, groupProtocol);
                            Map<TopicPartition, Long> committedOffsets = committedOffsets(clusterInstance, generateRandomTopic, generateRandomGroupId);
                            Map<TopicPartition, Long> committedOffsets2 = committedOffsets(clusterInstance, generateRandomTopic2, generateRandomGroupId);
                            TopicPartition topicPartition = new TopicPartition(generateRandomTopic, 1);
                            TopicPartition topicPartition2 = new TopicPartition(generateRandomTopic2, 1);
                            Map<TopicPartition, Long> offsetMap = toOffsetMap(resetOffsets(consumerGroupService).get(generateRandomGroupId));
                            HashMap hashMap = new HashMap();
                            hashMap.put(topicPartition, 0L);
                            hashMap.put(topicPartition2, 0L);
                            Assertions.assertEquals(hashMap, offsetMap);
                            committedOffsets.put(topicPartition, 0L);
                            Assertions.assertEquals(committedOffsets, committedOffsets(clusterInstance, generateRandomTopic, generateRandomGroupId));
                            committedOffsets2.put(topicPartition2, 0L);
                            Assertions.assertEquals(committedOffsets2, committedOffsets(clusterInstance, generateRandomTopic2, generateRandomGroupId));
                            createAdminClient.deleteTopics(Arrays.asList(generateRandomTopic, generateRandomTopic2)).all().get();
                            if (consumerGroupService != null) {
                                if (0 != 0) {
                                    try {
                                        consumerGroupService.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    consumerGroupService.close();
                                }
                            }
                            if (createAdminClient != null) {
                                if (0 != 0) {
                                    try {
                                        createAdminClient.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    createAdminClient.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th5) {
                if (createAdminClient != null) {
                    if (th != null) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th5;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsExportImportPlanSingleGroupArg(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            TopicPartition topicPartition = new TopicPartition(generateRandomTopic, 0);
            TopicPartition topicPartition2 = new TopicPartition(generateRandomTopic, 1);
            String[] buildArgsForGroup = buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--to-offset", "2", "--export");
            File tempFile = TestUtils.tempFile("reset", ".csv");
            Admin createAdminClient = clusterInstance.createAdminClient();
            Throwable th = null;
            try {
                ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(buildArgsForGroup);
                Throwable th2 = null;
                try {
                    try {
                        createAdminClient.createTopics(Collections.singleton(new NewTopic(generateRandomTopic, 2, (short) 1))).all().get();
                        produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 2, groupProtocol);
                        Map resetOffsets = consumerGroupService.resetOffsets();
                        writeContentToFile(tempFile, consumerGroupService.exportOffsetsToCsv(resetOffsets));
                        HashMap hashMap = new HashMap();
                        hashMap.put(topicPartition, 2L);
                        hashMap.put(topicPartition2, 2L);
                        Assertions.assertEquals(hashMap, toOffsetMap((Map) resetOffsets.get(generateRandomGroupId)));
                        ConsumerGroupCommand.ConsumerGroupService consumerGroupService2 = getConsumerGroupService(buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--from-file", tempFile.getCanonicalPath(), "--dry-run"));
                        Throwable th3 = null;
                        try {
                            try {
                                Assertions.assertEquals(hashMap, toOffsetMap((Map) consumerGroupService2.resetOffsets().get(generateRandomGroupId)));
                                if (consumerGroupService2 != null) {
                                    if (0 != 0) {
                                        try {
                                            consumerGroupService2.close();
                                        } catch (Throwable th4) {
                                            th3.addSuppressed(th4);
                                        }
                                    } else {
                                        consumerGroupService2.close();
                                    }
                                }
                                createAdminClient.deleteTopics(Collections.singleton(generateRandomTopic));
                                if (consumerGroupService != null) {
                                    if (0 != 0) {
                                        try {
                                            consumerGroupService.close();
                                        } catch (Throwable th5) {
                                            th2.addSuppressed(th5);
                                        }
                                    } else {
                                        consumerGroupService.close();
                                    }
                                }
                                if (createAdminClient != null) {
                                    if (0 != 0) {
                                        try {
                                            createAdminClient.close();
                                        } catch (Throwable th6) {
                                            th.addSuppressed(th6);
                                        }
                                    } else {
                                        createAdminClient.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th7) {
                    if (consumerGroupService != null) {
                        if (th2 != null) {
                            try {
                                consumerGroupService.close();
                            } catch (Throwable th8) {
                                th2.addSuppressed(th8);
                            }
                        } else {
                            consumerGroupService.close();
                        }
                    }
                    throw th7;
                }
            } catch (Throwable th9) {
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th10) {
                            th.addSuppressed(th10);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th9;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetOffsetsExportImportPlan(ClusterInstance clusterInstance) throws Exception {
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String generateRandomGroupId = generateRandomGroupId();
            String generateRandomGroupId2 = generateRandomGroupId();
            String generateRandomTopic = generateRandomTopic();
            String generateRandomTopic2 = generateRandomTopic();
            TopicPartition topicPartition = new TopicPartition(generateRandomTopic, 0);
            TopicPartition topicPartition2 = new TopicPartition(generateRandomTopic, 1);
            TopicPartition topicPartition3 = new TopicPartition(generateRandomTopic2, 0);
            TopicPartition topicPartition4 = new TopicPartition(generateRandomTopic2, 1);
            String[] buildArgsForGroups = buildArgsForGroups(clusterInstance, Arrays.asList(generateRandomGroupId, generateRandomGroupId2), "--all-topics", "--to-offset", "2", "--export");
            File tempFile = TestUtils.tempFile("reset", ".csv");
            Admin createAdminClient = clusterInstance.createAdminClient();
            Throwable th = null;
            try {
                ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(buildArgsForGroups);
                Throwable th2 = null;
                try {
                    createAdminClient.createTopics(Arrays.asList(new NewTopic(generateRandomTopic, 2, (short) 1), new NewTopic(generateRandomTopic2, 2, (short) 1))).all().get();
                    produceConsumeAndShutdown(clusterInstance, generateRandomTopic, generateRandomGroupId, 1, groupProtocol);
                    produceConsumeAndShutdown(clusterInstance, generateRandomTopic2, generateRandomGroupId2, 1, groupProtocol);
                    awaitConsumerGroupInactive(consumerGroupService, generateRandomGroupId);
                    awaitConsumerGroupInactive(consumerGroupService, generateRandomGroupId2);
                    Map resetOffsets = consumerGroupService.resetOffsets();
                    writeContentToFile(tempFile, consumerGroupService.exportOffsetsToCsv(resetOffsets));
                    HashMap hashMap = new HashMap();
                    hashMap.put(topicPartition, 2L);
                    hashMap.put(topicPartition2, 2L);
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put(topicPartition3, 2L);
                    hashMap2.put(topicPartition4, 2L);
                    Assertions.assertEquals(hashMap, toOffsetMap((Map) resetOffsets.get(generateRandomGroupId)));
                    Assertions.assertEquals(hashMap2, toOffsetMap((Map) resetOffsets.get(generateRandomGroupId2)));
                    ConsumerGroupCommand.ConsumerGroupService consumerGroupService2 = getConsumerGroupService(buildArgsForGroups(clusterInstance, Arrays.asList(generateRandomGroupId, generateRandomGroupId2), "--all-topics", "--from-file", tempFile.getCanonicalPath(), "--dry-run"));
                    Throwable th3 = null;
                    try {
                        try {
                            Map resetOffsets2 = consumerGroupService2.resetOffsets();
                            Assertions.assertEquals(hashMap, toOffsetMap((Map) resetOffsets2.get(generateRandomGroupId)));
                            Assertions.assertEquals(hashMap2, toOffsetMap((Map) resetOffsets2.get(generateRandomGroupId2)));
                            if (consumerGroupService2 != null) {
                                if (0 != 0) {
                                    try {
                                        consumerGroupService2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    consumerGroupService2.close();
                                }
                            }
                            consumerGroupService2 = getConsumerGroupService(buildArgsForGroup(clusterInstance, generateRandomGroupId, "--all-topics", "--from-file", tempFile.getCanonicalPath(), "--dry-run"));
                            Throwable th5 = null;
                            try {
                                try {
                                    Assertions.assertEquals(hashMap, toOffsetMap((Map) consumerGroupService2.resetOffsets().get(generateRandomGroupId)));
                                    if (consumerGroupService2 != null) {
                                        if (0 != 0) {
                                            try {
                                                consumerGroupService2.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        } else {
                                            consumerGroupService2.close();
                                        }
                                    }
                                    createAdminClient.deleteTopics(Arrays.asList(generateRandomTopic, generateRandomTopic2));
                                    if (consumerGroupService != null) {
                                        if (0 != 0) {
                                            try {
                                                consumerGroupService.close();
                                            } catch (Throwable th7) {
                                                th2.addSuppressed(th7);
                                            }
                                        } else {
                                            consumerGroupService.close();
                                        }
                                    }
                                    if (createAdminClient != null) {
                                        if (0 != 0) {
                                            try {
                                                createAdminClient.close();
                                            } catch (Throwable th8) {
                                                th.addSuppressed(th8);
                                            }
                                        } else {
                                            createAdminClient.close();
                                        }
                                    }
                                } finally {
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th9) {
                    if (consumerGroupService != null) {
                        if (0 != 0) {
                            try {
                                consumerGroupService.close();
                            } catch (Throwable th10) {
                                th2.addSuppressed(th10);
                            }
                        } else {
                            consumerGroupService.close();
                        }
                    }
                    throw th9;
                }
            } catch (Throwable th11) {
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th12) {
                            th.addSuppressed(th12);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                throw th11;
            }
        }
    }

    @ClusterTemplate("generator")
    public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance clusterInstance) {
        String[] strArr = {"--new-consumer", "--bootstrap-server", clusterInstance.bootstrapServers(), "--reset-offsets", "--group", generateRandomGroupId(), "--all-topics", "--to-offset", "2", "--export"};
        Assertions.assertThrows(OptionException.class, () -> {
            getConsumerGroupService(strArr);
        });
    }

    private String generateRandomTopic() {
        return TOPIC_PREFIX + TestUtils.randomString(10);
    }

    private String generateRandomGroupId() {
        return GROUP_PREFIX + TestUtils.randomString(10);
    }

    private Map<TopicPartition, Long> committedOffsets(ClusterInstance clusterInstance, String str, String str2) {
        try {
            Admin create = Admin.create(Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapServers()));
            Throwable th = null;
            try {
                try {
                    Map<TopicPartition, Long> map = (Map) ((Map) ((Map) create.listConsumerGroupOffsets(str2).all().get()).get(str2)).entrySet().stream().filter(entry -> {
                        return ((TopicPartition) entry.getKey()).topic().equals(str);
                    }).collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, entry2 -> {
                        return Long.valueOf(((OffsetAndMetadata) entry2.getValue()).offset());
                    }));
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    return map;
                } finally {
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] strArr) {
        return new ConsumerGroupCommand.ConsumerGroupService(ConsumerGroupCommandOptions.fromArgs(strArr), Collections.singletonMap("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private void produceMessages(ClusterInstance clusterInstance, String str, int i) {
        produceMessages(clusterInstance, (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new ProducerRecord(str, new byte[100000]);
        }).collect(Collectors.toList()));
    }

    private void produceMessages(ClusterInstance clusterInstance, List<ProducerRecord<byte[], byte[]>> list) {
        Producer<byte[], byte[]> createProducer = createProducer(clusterInstance);
        Throwable th = null;
        try {
            try {
                createProducer.getClass();
                list.forEach(createProducer::send);
                if (createProducer != null) {
                    if (0 == 0) {
                        createProducer.close();
                        return;
                    }
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createProducer != null) {
                if (th != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createProducer.close();
                }
            }
            throw th4;
        }
    }

    private Producer<byte[], byte[]> createProducer(ClusterInstance clusterInstance) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", clusterInstance.bootstrapServers());
        properties.put("acks", "1");
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        return new KafkaProducer(properties);
    }

    private void resetAndAssertOffsets(ClusterInstance clusterInstance, String str, String[] strArr, long j) {
        resetAndAssertOffsets(clusterInstance, strArr, j, false, Collections.singletonList(str));
    }

    private void resetAndAssertOffsets(ClusterInstance clusterInstance, String[] strArr, long j, boolean z, List<String> list) {
        ConsumerGroupCommand.ConsumerGroupService consumerGroupService = getConsumerGroupService(strArr);
        Throwable th = null;
        try {
            Map<String, Map<TopicPartition, Long>> topicExceptOffsets = getTopicExceptOffsets(list, j);
            Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets = resetOffsets(consumerGroupService);
            for (String str : list) {
                resetOffsets.forEach((str2, map) -> {
                    Map<TopicPartition, Long> committedOffsets = committedOffsets(clusterInstance, str, str2);
                    Assertions.assertEquals(topicExceptOffsets.get(str), partitionToOffsets(str, map));
                    Assertions.assertEquals(z ? committedOffsets : topicExceptOffsets.get(str), committedOffsets(clusterInstance, str, str2));
                });
            }
            if (consumerGroupService != null) {
                if (0 == 0) {
                    consumerGroupService.close();
                    return;
                }
                try {
                    consumerGroupService.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (consumerGroupService != null) {
                if (0 != 0) {
                    try {
                        consumerGroupService.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    consumerGroupService.close();
                }
            }
            throw th3;
        }
    }

    private Map<String, Map<TopicPartition, Long>> getTopicExceptOffsets(List<String> list, long j) {
        return (Map) list.stream().collect(Collectors.toMap(Function.identity(), str -> {
            return Collections.singletonMap(new TopicPartition(str, 0), Long.valueOf(j));
        }));
    }

    private Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets(ConsumerGroupCommand.ConsumerGroupService consumerGroupService) {
        return consumerGroupService.resetOffsets();
    }

    private Map<TopicPartition, Long> partitionToOffsets(String str, Map<TopicPartition, OffsetAndMetadata> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return Objects.equals(((TopicPartition) entry.getKey()).topic(), str);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return Long.valueOf(((OffsetAndMetadata) entry2.getValue()).offset());
        }));
    }

    private static List<String> generateIds(String str) {
        return (List) IntStream.rangeClosed(1, 2).mapToObj(i -> {
            return str + i;
        }).collect(Collectors.toList());
    }

    private void produceConsumeAndShutdown(ClusterInstance clusterInstance, String str, String str2, int i, GroupProtocol groupProtocol) throws Exception {
        produceMessages(clusterInstance, str, 100);
        AutoCloseable consumerGroupClosable = consumerGroupClosable(clusterInstance, i, str, str2, groupProtocol);
        Throwable th = null;
        try {
            try {
                awaitConsumerProgress(clusterInstance, str, str2, 100L);
                if (consumerGroupClosable != null) {
                    if (0 == 0) {
                        consumerGroupClosable.close();
                        return;
                    }
                    try {
                        consumerGroupClosable.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (consumerGroupClosable != null) {
                if (th != null) {
                    try {
                        consumerGroupClosable.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    consumerGroupClosable.close();
                }
            }
            throw th4;
        }
    }

    private void writeContentToFile(File file, String str) throws IOException {
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write(str);
                if (bufferedWriter != null) {
                    if (0 == 0) {
                        bufferedWriter.close();
                        return;
                    }
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th4;
        }
    }

    private AutoCloseable consumerGroupClosable(ClusterInstance clusterInstance, int i, String str, String str2, GroupProtocol groupProtocol) {
        Map<String, Object> composeConsumerConfigs = composeConsumerConfigs(clusterInstance, str2, groupProtocol);
        return ConsumerGroupCommandTestUtils.buildConsumers(i, false, str, () -> {
            return new KafkaConsumer(composeConsumerConfigs);
        });
    }

    private Map<String, Object> composeConsumerConfigs(ClusterInstance clusterInstance, String str, GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", clusterInstance.bootstrapServers());
        hashMap.put("group.id", str);
        hashMap.put("group.protocol", groupProtocol.name);
        hashMap.put("key.deserializer", StringDeserializer.class.getName());
        hashMap.put("value.deserializer", StringDeserializer.class.getName());
        hashMap.put("partition.assignment.strategy", RangeAssignor.class.getName());
        hashMap.put("auto.commit.interval.ms", 1000);
        hashMap.put("group.initial.rebalance.delay.ms", 1000);
        return hashMap;
    }

    private void awaitConsumerProgress(ClusterInstance clusterInstance, String str, String str2, long j) throws Exception {
        Admin create = Admin.create(Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapServers()));
        Throwable th = null;
        try {
            Supplier supplier = () -> {
                try {
                    return Long.valueOf(((Map) ((Map) create.listConsumerGroupOffsets(str2).all().get()).get(str2)).entrySet().stream().filter(entry -> {
                        return ((TopicPartition) entry.getKey()).topic().equals(str);
                    }).mapToLong(entry2 -> {
                        return ((OffsetAndMetadata) entry2.getValue()).offset();
                    }).sum());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            };
            TestUtils.waitForCondition(() -> {
                return ((Long) supplier.get()).longValue() == j;
            }, "Expected that consumer group has consumed all messages from topic/partition. Expected offset: " + j + ". Actual offset: " + supplier.get());
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService consumerGroupService, String str) throws Exception {
        TestUtils.waitForCondition(() -> {
            ConsumerGroupState consumerGroupState = consumerGroupService.collectGroupState(str).state;
            return Objects.equals(consumerGroupState, ConsumerGroupState.EMPTY) || Objects.equals(consumerGroupState, ConsumerGroupState.DEAD);
        }, "Expected that consumer group is inactive. Actual state: " + consumerGroupService.collectGroupState(str).state);
    }

    private void resetAndAssertOffsetsCommitted(ClusterInstance clusterInstance, ConsumerGroupCommand.ConsumerGroupService consumerGroupService, Map<TopicPartition, Long> map, String str) {
        resetOffsets(consumerGroupService).forEach((str2, map2) -> {
            map2.forEach((topicPartition, offsetAndMetadata) -> {
                Assertions.assertEquals(offsetAndMetadata.offset(), (Long) map.get(topicPartition));
                Assertions.assertEquals(map, committedOffsets(clusterInstance, str, str2));
            });
        });
    }

    private Map<TopicPartition, Long> toOffsetMap(Map<TopicPartition, OffsetAndMetadata> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((OffsetAndMetadata) entry.getValue()).offset());
        }));
    }

    private String[] addTo(String[] strArr, String... strArr2) {
        ArrayList arrayList = new ArrayList(Arrays.asList(strArr));
        arrayList.addAll(Arrays.asList(strArr2));
        return (String[]) arrayList.toArray(new String[0]);
    }
}
