package io.nosqlbench.adapter.kafka.dispensers;

import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.class */
public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
    private static final Logger logger;
    private final Map<String, String> consumerClientConfMap;
    protected final int msgPollIntervalInSec;
    protected final int maxMsgCntPerCommit;
    protected boolean autoCommitEnabled;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MessageConsumerOpDispenser(DriverAdapter driverAdapter, ParsedOp parsedOp, LongFunction<String> longFunction, KafkaSpace kafkaSpace) {
        super(driverAdapter, parsedOp, longFunction, kafkaSpace);
        this.consumerClientConfMap = new HashMap();
        this.consumerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getConsumerConfMap());
        this.consumerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
        this.msgPollIntervalInSec = NumberUtils.toInt((String) this.parsedOp.getStaticConfigOr("msg_poll_interval", "0"));
        this.maxMsgCntPerCommit = NumberUtils.toInt(this.parsedOp.getStaticConfig("manual_commit_batch_num", String.class));
        this.autoCommitEnabled = true;
        if (this.maxMsgCntPerCommit > 0) {
            this.autoCommitEnabled = false;
            this.consumerClientConfMap.put("enable.auto.commit", "false");
        } else if (this.consumerClientConfMap.containsKey("enable.auto.commit")) {
            this.autoCommitEnabled = BooleanUtils.toBoolean(this.consumerClientConfMap.get("enable.auto.commit"));
        }
    }

    private String getEffectiveGroupId(long j) {
        int i = (int) (j % this.consumerGrpCnt);
        String str = KafkaAdapterUtil.DFT_CONSUMER_GROUP_NAME_PREFIX;
        if (this.consumerClientConfMap.containsKey("group.id")) {
            str = this.consumerClientConfMap.get("group.id");
        }
        return str + "-" + i;
    }

    private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer(long j, List<String> list, String str) {
        String buildCacheKey = KafkaAdapterUtil.buildCacheKey("consumer-" + String.valueOf(j % this.kafkaClntCnt), (String) list.stream().collect(Collectors.joining("::")), str);
        OpTimeTrackKafkaClient opTimeTrackKafkaClient = this.kafkaSpace.getOpTimeTrackKafkaClient(buildCacheKey);
        if (opTimeTrackKafkaClient == null) {
            Properties properties = new Properties();
            properties.putAll(this.consumerClientConfMap);
            properties.put("group.id", str);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            synchronized (this) {
                kafkaConsumer.subscribe(list);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Kafka consumer created: {}/{} -- {}, {}, {}", buildCacheKey, kafkaConsumer, list, Boolean.valueOf(this.autoCommitEnabled), Integer.valueOf(this.maxMsgCntPerCommit));
            }
            opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(this.kafkaSpace, this.asyncAPI, this.msgPollIntervalInSec, this.autoCommitEnabled, this.maxMsgCntPerCommit, kafkaConsumer);
            this.kafkaSpace.addOpTimeTrackKafkaClient(buildCacheKey, opTimeTrackKafkaClient);
        }
        return opTimeTrackKafkaClient;
    }

    protected List<String> getEffectiveTopicNameList(long j) {
        String apply = this.topicNameStrFunc.apply(j);
        if ($assertionsDisabled || StringUtils.isNotBlank(apply)) {
            return Arrays.stream(StringUtils.split(apply, ',')).filter(str -> {
                return StringUtils.isNotBlank(str);
            }).toList();
        }
        throw new AssertionError();
    }

    /* renamed from: apply, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public KafkaOp m8apply(long j) {
        List<String> effectiveTopicNameList = getEffectiveTopicNameList(j);
        String effectiveGroupId = getEffectiveGroupId(j);
        if (effectiveTopicNameList.size() == 0 || StringUtils.isBlank(effectiveGroupId)) {
            throw new KafkaAdapterInvalidParamException("Effective consumer group name and/or topic names  are needed for creating a consumer!");
        }
        return new KafkaOp(this.kafkaAdapterMetrics, this.kafkaSpace, getOrCreateOpTimeTrackKafkaConsumer(j, effectiveTopicNameList, effectiveGroupId), null);
    }

    static {
        $assertionsDisabled = !MessageConsumerOpDispenser.class.desiredAssertionStatus();
        logger = LogManager.getLogger("MessageConsumerOpDispenser");
    }
}
