package io.nosqlbench.adapter.kafka;

import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/adapter/kafka/KafkaSpace.class */
public class KafkaSpace implements AutoCloseable {
    private static final Logger logger = LogManager.getLogger(KafkaSpace.class);
    private final String spaceName;
    private final NBConfiguration cfg;
    private final String bootstrapSvr;
    private final String kafkaClientConfFileName;
    private final KafkaClientConf kafkaClientConf;
    private final boolean strictMsgErrorHandling;
    private final long maxOpTimeInSec;
    private final int kafkaClntNum;
    private final int consumerGrpNum;
    private long totalCycleNum;
    private final ConcurrentHashMap<String, OpTimeTrackKafkaClient> opTimeTrackKafkaClients = new ConcurrentHashMap<>();
    private AtomicBoolean beingShutdown = new AtomicBoolean(false);
    private final long activityStartTimeMills = System.currentTimeMillis();

    public KafkaSpace(String str, NBConfiguration nBConfiguration) {
        this.spaceName = str;
        this.cfg = nBConfiguration;
        this.bootstrapSvr = (String) nBConfiguration.get("bootstrap_server");
        this.kafkaClntNum = NumberUtils.toInt((String) nBConfiguration.getOptional("num_clnt").orElse("1"));
        this.consumerGrpNum = NumberUtils.toInt((String) nBConfiguration.getOptional("num_cons_grp").orElse("1"));
        this.maxOpTimeInSec = NumberUtils.toLong((String) nBConfiguration.getOptional("max_op_time").orElse("0L"));
        this.strictMsgErrorHandling = BooleanUtils.toBoolean((String) nBConfiguration.getOptional("strict_msg_error_handling").orElse("false"));
        this.kafkaClientConfFileName = (String) nBConfiguration.get("config");
        this.kafkaClientConf = new KafkaClientConf(this.kafkaClientConfFileName);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        shutdownSpace();
    }

    public static NBConfigModel getConfigModel() {
        return ConfigModel.of(KafkaSpace.class).add(Param.defaultTo("bootstrap_server", "pulsar://localhost:9020").setDescription("Kafka bootstrap server URL.")).add(Param.defaultTo("config", "config.properties").setDescription("Kafka client connection configuration property file.")).add(Param.defaultTo("num_clnt", 1).setDescription("Number of Kafka clients. For consumer, this is the number of consumers per consumer group")).add(Param.defaultTo("num_cons_grp", 1).setDescription("Number of consumer groups (only relevant for Kafka consumer workload). ")).add(Param.defaultTo("max_op_time", 0).setDescription("Maximum time (in seconds) to run NB Kafka testing scenario.")).add(Param.defaultTo("strict_msg_error_handling", false).setDescription("Whether to do strict error handling which is to stop NB Kafka execution.")).asReadOnly();
    }

    public OpTimeTrackKafkaClient getOpTimeTrackKafkaClient(String str) {
        return this.opTimeTrackKafkaClients.get(str);
    }

    public void addOpTimeTrackKafkaClient(String str, OpTimeTrackKafkaClient opTimeTrackKafkaClient) {
        this.opTimeTrackKafkaClients.put(str, opTimeTrackKafkaClient);
    }

    public long getActivityStartTimeMills() {
        return this.activityStartTimeMills;
    }

    public long getMaxOpTimeInSec() {
        return this.maxOpTimeInSec;
    }

    public String getBootstrapSvr() {
        return this.bootstrapSvr;
    }

    public KafkaClientConf getKafkaClientConf() {
        return this.kafkaClientConf;
    }

    public int getKafkaClntNum() {
        return this.kafkaClntNum;
    }

    public int getConsumerGrpNum() {
        return this.consumerGrpNum;
    }

    public boolean isStrictMsgErrorHandling() {
        return this.strictMsgErrorHandling;
    }

    public long getTotalCycleNum() {
        return this.totalCycleNum;
    }

    public void setTotalCycleNum(long j) {
        this.totalCycleNum = j;
    }

    public boolean isShuttigDown() {
        return this.beingShutdown.get();
    }

    public void shutdownSpace() {
        try {
            this.beingShutdown.set(true);
            Iterator<OpTimeTrackKafkaClient> it = this.opTimeTrackKafkaClients.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            KafkaAdapterUtil.pauseCurThreadExec(5);
        } catch (Exception e) {
            e.printStackTrace();
            throw new KafkaAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
        }
    }
}
