package org.apache.kafka.tiered.storage;

import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import kafka.log.UnifiedLog;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:org/apache/kafka/tiered/storage/TieredStorageTestContext.class */
public final class TieredStorageTestContext implements AutoCloseable {
    private final TieredStorageTestHarness harness;
    private final Serializer<String> ser = new StringSerializer();
    private final Deserializer<String> de = new StringDeserializer();
    private final Map<String, TopicSpec> topicSpecs = new HashMap();
    private final TieredStorageTestReport testReport = new TieredStorageTestReport(this);
    private volatile KafkaProducer<String, String> producer;
    private volatile Consumer<String, String> consumer;
    private volatile Admin admin;
    private volatile List<LocalTieredStorage> remoteStorageManagers;
    private volatile List<BrokerLocalStorage> localStorages;

    public TieredStorageTestContext(TieredStorageTestHarness tieredStorageTestHarness) {
        this.harness = tieredStorageTestHarness;
        initClients();
        initContext();
    }

    private void initClients() {
        ListenerName listenerName = this.harness.listenerName();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.harness.bootstrapServers(listenerName));
        Properties properties2 = new Properties();
        properties2.put("linger.ms", String.valueOf(TimeUnit.SECONDS.toMillis(60L)));
        properties2.putAll(properties);
        this.producer = this.harness.createProducer(this.ser, this.ser, properties2);
        this.consumer = this.harness.createConsumer(this.de, this.de, properties, CollectionConverters.asScala(Collections.emptyList()).toList());
        this.admin = this.harness.createAdminClient(listenerName, properties);
    }

    private void initContext() {
        this.remoteStorageManagers = TieredStorageTestHarness.remoteStorageManagers(this.harness.aliveBrokers());
        this.localStorages = TieredStorageTestHarness.localStorages(this.harness.aliveBrokers());
    }

    public void createTopic(TopicSpec topicSpec) throws ExecutionException, InterruptedException {
        NewTopic newTopic;
        if (topicSpec.getAssignment() == null || topicSpec.getAssignment().isEmpty()) {
            newTopic = new NewTopic(topicSpec.getTopicName(), topicSpec.getPartitionCount(), (short) topicSpec.getReplicationFactor());
        } else {
            newTopic = new NewTopic(topicSpec.getTopicName(), topicSpec.getAssignment());
        }
        newTopic.configs(topicSpec.getProperties());
        this.admin.createTopics(Collections.singletonList(newTopic)).all().get();
        TestUtils.waitForAllPartitionsMetadata(this.harness.brokers(), topicSpec.getTopicName(), topicSpec.getPartitionCount());
        synchronized (this) {
            this.topicSpecs.put(topicSpec.getTopicName(), topicSpec);
        }
    }

    public void createPartitions(ExpandPartitionCountSpec expandPartitionCountSpec) throws ExecutionException, InterruptedException {
        NewPartitions increaseTo;
        if (expandPartitionCountSpec.getAssignment() == null || expandPartitionCountSpec.getAssignment().isEmpty()) {
            increaseTo = NewPartitions.increaseTo(expandPartitionCountSpec.getPartitionCount());
        } else {
            increaseTo = NewPartitions.increaseTo(expandPartitionCountSpec.getPartitionCount(), (List) expandPartitionCountSpec.getAssignment().entrySet().stream().sorted(Map.Entry.comparingByKey()).map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toList()));
        }
        this.admin.createPartitions(Collections.singletonMap(expandPartitionCountSpec.getTopicName(), increaseTo)).all().get();
        TestUtils.waitForAllPartitionsMetadata(this.harness.brokers(), expandPartitionCountSpec.getTopicName(), expandPartitionCountSpec.getPartitionCount());
    }

    public void updateTopicConfig(String str, Map<String, String> map, List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
        updateResource(new ConfigResource(ConfigResource.Type.TOPIC, str), map, list);
    }

    public void updateBrokerConfig(Integer num, Map<String, String> map, List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
        updateResource(new ConfigResource(ConfigResource.Type.BROKER, num.toString()), map, list);
    }

    private void updateResource(ConfigResource configResource, Map<String, String> map, List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList arrayList = new ArrayList();
        list.forEach(str -> {
            arrayList.add(new AlterConfigOp(new ConfigEntry(str, ""), AlterConfigOp.OpType.DELETE));
        });
        map.forEach((str2, str3) -> {
            arrayList.add(new AlterConfigOp(new ConfigEntry(str2, str3), AlterConfigOp.OpType.SET));
        });
        AlterConfigsOptions timeoutMs = new AlterConfigsOptions().timeoutMs(30000);
        this.admin.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList), timeoutMs).all().get(30L, TimeUnit.SECONDS);
    }

    public void deleteTopic(String str) {
        TestUtils.deleteTopicWithAdmin(this.admin, str, this.harness.brokers(), this.harness.controllerServers());
    }

    public void produce(List<ProducerRecord<String, String>> list, Integer num) {
        int i = 1;
        Iterator<ProducerRecord<String, String>> it = list.iterator();
        while (it.hasNext()) {
            this.producer.send(it.next());
            int i2 = i;
            i++;
            if (i2 % num.intValue() == 0) {
                this.producer.flush();
            }
        }
        this.producer.flush();
    }

    public List<ConsumerRecord<String, String>> consume(TopicPartition topicPartition, Integer num, Long l) {
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.seek(topicPartition, l.longValue());
        long j = 60000;
        String lineSeparator = System.lineSeparator();
        ArrayList arrayList = new ArrayList();
        TestUtils.pollRecordsUntilTrue(this.consumer, consumerRecords -> {
            Objects.requireNonNull(arrayList);
            consumerRecords.forEach((v1) -> {
                r1.add(v1);
            });
            return Boolean.valueOf(arrayList.size() >= num.intValue());
        }, () -> {
            return String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s", num, topicPartition, l, Long.valueOf(j), Integer.valueOf(arrayList.size()), lineSeparator, arrayList.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(lineSeparator)));
        }, 60000L, 100L);
        return arrayList;
    }

    public Long nextOffset(TopicPartition topicPartition) {
        List singletonList = Collections.singletonList(topicPartition);
        this.consumer.assign(singletonList);
        this.consumer.seekToEnd(singletonList);
        return Long.valueOf(this.consumer.position(topicPartition));
    }

    public Long beginOffset(TopicPartition topicPartition) {
        List singletonList = Collections.singletonList(topicPartition);
        this.consumer.assign(singletonList);
        this.consumer.seekToBeginning(singletonList);
        return Long.valueOf(this.consumer.position(topicPartition));
    }

    public void bounce(int i) {
        this.harness.killBroker(i);
        boolean isEmpty = this.harness.aliveBrokers().isEmpty();
        this.harness.startBroker(i);
        if (isEmpty) {
            reinitClients();
        }
        initContext();
    }

    public void stop(int i) {
        this.harness.killBroker(i);
        initContext();
    }

    public void start(int i) {
        boolean isEmpty = this.harness.aliveBrokers().isEmpty();
        this.harness.startBroker(i);
        if (isEmpty) {
            reinitClients();
        }
        initContext();
    }

    public void eraseBrokerStorage(int i, FilenameFilter filenameFilter, boolean z) throws IOException {
        (z ? TieredStorageTestHarness.localStorages(this.harness.brokers()).stream().filter(brokerLocalStorage -> {
            return brokerLocalStorage.getBrokerId().intValue() == i;
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("No local storage found for broker " + i);
        }) : this.localStorages.get(i)).eraseStorage(filenameFilter);
    }

    public TopicSpec topicSpec(String str) {
        TopicSpec topicSpec;
        synchronized (this.topicSpecs) {
            topicSpec = this.topicSpecs.get(str);
        }
        return topicSpec;
    }

    public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
        return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(((KafkaBroker) this.harness.aliveBrokers().head()).config().brokerId()));
    }

    public LocalTieredStorageHistory tieredStorageHistory(int i) {
        return remoteStorageManager(i).getHistory();
    }

    public LocalTieredStorage remoteStorageManager(int i) {
        return this.remoteStorageManagers.stream().filter(localTieredStorage -> {
            return localTieredStorage.brokerId() == i;
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("No remote storage manager found for broker " + i);
        });
    }

    public Optional<LeaderEpochFileCache> leaderEpochFileCache(int i, TopicPartition topicPartition) {
        return log(Integer.valueOf(i), topicPartition).map(unifiedLog -> {
            return unifiedLog.leaderEpochCache();
        });
    }

    public List<LocalTieredStorage> remoteStorageManagers() {
        return this.remoteStorageManagers;
    }

    public List<BrokerLocalStorage> localStorages() {
        return this.localStorages;
    }

    public Deserializer<String> de() {
        return this.de;
    }

    public Admin admin() {
        return this.admin;
    }

    public boolean isActive(Integer num) {
        return this.harness.aliveBrokers().exists(kafkaBroker -> {
            return Boolean.valueOf(kafkaBroker.config().brokerId() == num.intValue());
        });
    }

    public boolean isAssignedReplica(TopicPartition topicPartition, Integer num) throws ExecutionException, InterruptedException {
        String str = topicPartition.topic();
        return ((TopicPartitionInfo) ((TopicDescription) ((Map) this.admin.describeTopics(Collections.singletonList(topicPartition.topic())).allTopicNames().get()).get(str)).partitions().get(topicPartition.partition())).replicas().stream().anyMatch(node -> {
            return node.id() == num.intValue();
        });
    }

    public Optional<UnifiedLog> log(Integer num, TopicPartition topicPartition) {
        Option log = ((KafkaBroker) this.harness.brokers().apply(num)).logManager().getLog(topicPartition, false);
        return log.isDefined() ? Optional.of((UnifiedLog) log.get()) : Optional.empty();
    }

    public void succeed(TieredStorageTestAction tieredStorageTestAction) {
        this.testReport.addSucceeded(tieredStorageTestAction);
    }

    public void fail(TieredStorageTestAction tieredStorageTestAction) {
        this.testReport.addFailed(tieredStorageTestAction);
    }

    public void printReport(PrintStream printStream) {
        this.testReport.print(printStream);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
    }

    private void reinitClients() {
        Utils.closeQuietly(this.producer, "Producer client");
        Utils.closeQuietly(this.consumer, "Consumer client");
        Utils.closeQuietly(this.admin, "Admin client");
        initClients();
    }
}
