package io.nosqlbench.adapter.kafka.dispensers;

import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.LongFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.class */
public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
    private static final Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
    public static final String MSG_HEADER_OP_PARAM = "msg_header";
    public static final String MSG_KEY_OP_PARAM = "msg_key";
    public static final String MSG_BODY_OP_PARAM = "msg_body";
    private final Map<String, String> producerClientConfMap;
    protected final int txnBatchNum;
    private final LongFunction<String> msgHeaderJsonStrFunc;
    private final LongFunction<String> msgKeyStrFunc;
    private final LongFunction<String> msgValueStrFunc;

    public MessageProducerOpDispenser(DriverAdapter driverAdapter, ParsedOp parsedOp, LongFunction<String> longFunction, KafkaSpace kafkaSpace) {
        super(driverAdapter, parsedOp, longFunction, kafkaSpace);
        this.producerClientConfMap = new HashMap();
        this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
        this.producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
        this.txnBatchNum = ((Integer) this.parsedOp.getStaticConfigOr("txn_batch_num", 0)).intValue();
        this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
        this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
        this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
    }

    private String getEffectiveClientId(long j) {
        return this.producerClientConfMap.containsKey("client.id") ? this.producerClientConfMap.get("client.id") + "-" + ((int) (j % this.kafkaClntCnt)) : "";
    }

    private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(long j, String str, String str2) {
        String buildCacheKey = KafkaAdapterUtil.buildCacheKey("producer-" + String.valueOf(j % this.kafkaClntCnt), str);
        OpTimeTrackKafkaClient opTimeTrackKafkaClient = this.kafkaSpace.getOpTimeTrackKafkaClient(buildCacheKey);
        if (opTimeTrackKafkaClient == null) {
            Properties properties = new Properties();
            properties.putAll(this.producerClientConfMap);
            if (StringUtils.isNotBlank(str2)) {
                properties.put("client.id", str2);
            } else {
                properties.remove("client.id");
            }
            if (this.txnBatchNum < 2) {
                properties.remove("transactional.id");
            }
            boolean z = false;
            if (properties.containsKey("transactional.id")) {
                properties.put("transactional.id", properties.get("transactional.id").toString() + "-" + buildCacheKey);
                z = StringUtils.isNotBlank(properties.get("transactional.id").toString());
            }
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            if (z) {
                kafkaProducer.initTransactions();
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Producer created: {}/{} -- ({}, {}, {})", buildCacheKey, kafkaProducer, str, Boolean.valueOf(z), str2);
            }
            opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer(this.kafkaSpace, this.asyncAPI, z, this.txnBatchNum, kafkaProducer);
            this.kafkaSpace.addOpTimeTrackKafkaClient(buildCacheKey, opTimeTrackKafkaClient);
        }
        return opTimeTrackKafkaClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private ProducerRecord<String, String> createKafkaMessage(long j, String str, String str2, String str3, String str4) {
        if (StringUtils.isAllBlank(new CharSequence[]{str3, str4})) {
            throw new KafkaAdapterInvalidParamException("Message key and value can't both be empty!");
        }
        int strObjSize = KafkaAdapterUtil.getStrObjSize(str3) + KafkaAdapterUtil.getStrObjSize(str4);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(str, str3, str4);
        Map hashMap = new HashMap();
        if (!StringUtils.isBlank(str2)) {
            try {
                hashMap = KafkaAdapterUtil.convertJsonToMap(str2);
            } catch (Exception e) {
                logger.warn("Error parsing message property JSON string {}, ignore message properties!", str2);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            String str5 = (String) entry.getKey();
            String str6 = (String) entry.getValue();
            strObjSize += KafkaAdapterUtil.getStrObjSize(str5) + KafkaAdapterUtil.getStrObjSize(str6);
            if (!StringUtils.isAnyBlank(new CharSequence[]{str5, str6})) {
                producerRecord.headers().add(str5, str6.getBytes());
            }
        }
        int strObjSize2 = strObjSize + KafkaAdapterUtil.getStrObjSize(KafkaAdapterUtil.NB_MSG_SEQ_PROP) + 8 + KafkaAdapterUtil.getStrObjSize(KafkaAdapterUtil.NB_MSG_SIZE_PROP) + 6;
        producerRecord.headers().add(KafkaAdapterUtil.NB_MSG_SEQ_PROP, String.valueOf(j).getBytes());
        producerRecord.headers().add(KafkaAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(strObjSize2).getBytes());
        return producerRecord;
    }

    /* renamed from: apply, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public KafkaOp m11apply(long j) {
        String apply = this.topicNameStrFunc.apply(j);
        return new KafkaOp(this.kafkaAdapterMetrics, this.kafkaSpace, getOrCreateOpTimeTrackKafkaProducer(j, apply, getEffectiveClientId(j)), createKafkaMessage(j, apply, this.msgHeaderJsonStrFunc.apply(j), this.msgKeyStrFunc.apply(j), this.msgValueStrFunc.apply(j)));
    }
}
