package io.nosqlbench.adapter.kafka.ops;

import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.class */
public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
    private static final Logger logger;
    private final boolean transactionEnabled;
    private final boolean asyncMsgAck;
    private final boolean transactEnabledConfig;
    private final int txnBatchNum;
    private static ThreadLocal<Integer> txnBatchTrackingCntTL;
    private static ThreadLocal<TxnProcResult> txnProcResultTL;
    private final KafkaProducer<String, String> producer;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer$TxnProcResult.class */
    public enum TxnProcResult {
        SUCCESS,
        RECOVERABLE_ERROR,
        FATAL_ERROR,
        UNKNOWN_ERROR
    }

    public OpTimeTrackKafkaProducer(KafkaSpace kafkaSpace, boolean z, boolean z2, int i, KafkaProducer<String, String> kafkaProducer) {
        super(kafkaSpace);
        this.asyncMsgAck = z;
        this.transactEnabledConfig = z2;
        this.txnBatchNum = i;
        this.transactionEnabled = z2 && i > 2;
        this.producer = kafkaProducer;
    }

    public static int getTxnBatchTrackingCntTL() {
        return txnBatchTrackingCntTL.get().intValue();
    }

    public static void incTxnBatchTrackingCnt() {
        txnBatchTrackingCntTL.set(Integer.valueOf(getTxnBatchTrackingCntTL() + 1));
    }

    public static void resetTxnBatchTrackingCnt() {
        txnBatchTrackingCntTL.set(0);
    }

    public static TxnProcResult getTxnProcResultTL() {
        return txnProcResultTL.get();
    }

    public static void setTxnProcResultTL(TxnProcResult txnProcResult) {
        txnProcResultTL.set(txnProcResult);
    }

    public static void resetTxnProcResultTL(TxnProcResult txnProcResult) {
        txnProcResultTL.set(TxnProcResult.SUCCESS);
    }

    private void processMsgTransaction(long j, KafkaProducer<String, String> kafkaProducer) {
        TxnProcResult txnProcResult = TxnProcResult.SUCCESS;
        if (this.transactionEnabled) {
            int txnBatchTrackingCntTL2 = getTxnBatchTrackingCntTL();
            try {
                if (txnBatchTrackingCntTL2 == 0) {
                    kafkaProducer.beginTransaction();
                    if (logger.isDebugEnabled()) {
                        logger.debug("New transaction started ( {}, {}, {}, {}, {} )", Long.valueOf(j), kafkaProducer, Boolean.valueOf(this.transactEnabledConfig), Integer.valueOf(this.txnBatchNum), Integer.valueOf(getTxnBatchTrackingCntTL()));
                    }
                } else if (txnBatchTrackingCntTL2 % (this.txnBatchNum - 1) == 0 || j == this.kafkaSpace.getTotalCycleNum() - 1) {
                    synchronized (this) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Start committing transaction ... ( {}, {}, {}, {}, {} )", Long.valueOf(j), kafkaProducer, Boolean.valueOf(this.transactEnabledConfig), Integer.valueOf(this.txnBatchNum), Integer.valueOf(getTxnBatchTrackingCntTL()));
                        }
                        kafkaProducer.commitTransaction();
                        if (logger.isDebugEnabled()) {
                            logger.debug("Transaction committed ( {}, {}, {}, {}, {} )", Long.valueOf(j), kafkaProducer, Boolean.valueOf(this.transactEnabledConfig), Integer.valueOf(this.txnBatchNum), Integer.valueOf(getTxnBatchTrackingCntTL()));
                        }
                        kafkaProducer.beginTransaction();
                        if (logger.isDebugEnabled()) {
                            logger.debug("New transaction started ( {}, {}, {}, {}, {} )", Long.valueOf(j), kafkaProducer, Boolean.valueOf(this.transactEnabledConfig), Integer.valueOf(this.txnBatchNum), Integer.valueOf(getTxnBatchTrackingCntTL()));
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                txnProcResult = ((e instanceof IllegalStateException) || (e instanceof ProducerFencedException) || (e instanceof UnsupportedOperationException) || (e instanceof AuthorizationException)) ? TxnProcResult.FATAL_ERROR : ((e instanceof TimeoutException) || (e instanceof InterruptException)) ? TxnProcResult.RECOVERABLE_ERROR : TxnProcResult.UNKNOWN_ERROR;
            }
        }
        setTxnProcResultTL(txnProcResult);
    }

    @Override // io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient
    void cycleMsgProcess(final long j, Object obj) {
        if (!$assertionsDisabled && obj == null) {
            throw new AssertionError();
        }
        if (this.kafkaSpace.isShuttigDown()) {
            if (this.transactionEnabled) {
                try {
                    this.producer.abortTransaction();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Abort open transaction while shutting down ( {}, {}, {}, {}, {} )", Long.valueOf(j), this.producer, Boolean.valueOf(this.transactEnabledConfig), Integer.valueOf(this.txnBatchNum), Integer.valueOf(getTxnBatchTrackingCntTL()));
                    }
                    return;
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
            return;
        }
        processMsgTransaction(j, this.producer);
        TxnProcResult txnProcResultTL2 = getTxnProcResultTL();
        if (txnProcResultTL2 == TxnProcResult.RECOVERABLE_ERROR) {
            try {
                this.producer.abortTransaction();
            } catch (Exception e2) {
                throw new KafkaAdapterUnexpectedException("Aborting transaction failed!");
            }
        } else {
            if (txnProcResultTL2 == TxnProcResult.FATAL_ERROR) {
                throw new KafkaAdapterUnexpectedException("Fatal error when initializing or committing transactions!");
            }
            if (txnProcResultTL2 == TxnProcResult.UNKNOWN_ERROR) {
                logger.debug("Unexpected error when initializing or committing transactions!");
            }
        }
        ProducerRecord producerRecord = (ProducerRecord) obj;
        try {
            if (txnProcResultTL2 == TxnProcResult.SUCCESS) {
                Future send = this.producer.send(producerRecord, new Callback() { // from class: io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer.1
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (OpTimeTrackKafkaProducer.this.asyncMsgAck && OpTimeTrackKafkaProducer.logger.isDebugEnabled()) {
                            OpTimeTrackKafkaProducer.logger.debug("Message sending with async ack. is successful ({}) - {}, {}", Long.valueOf(j), OpTimeTrackKafkaProducer.this.producer, recordMetadata);
                        }
                    }
                });
                if (!this.asyncMsgAck) {
                    try {
                        RecordMetadata recordMetadata = (RecordMetadata) send.get();
                        if (logger.isDebugEnabled()) {
                            logger.debug("Message sending with sync ack. is successful ({}) - {}, {}", Long.valueOf(j), this.producer, recordMetadata);
                        }
                    } catch (InterruptedException | ExecutionException e3) {
                        KafkaAdapterUtil.messageErrorHandling(e3, this.kafkaSpace.isStrictMsgErrorHandling(), "Unexpected error when waiting to receive message-send ack from the Kafka cluster.\n-----\n" + e3);
                    }
                }
                incTxnBatchTrackingCnt();
            }
        } catch (ProducerFencedException | OutOfOrderSequenceException | UnsupportedOperationException | AuthorizationException e4) {
            if (logger.isDebugEnabled()) {
                logger.debug("Fatal error when sending a message ({}) - {}, {}", Long.valueOf(j), this.producer, producerRecord);
            }
            throw new KafkaAdapterUnexpectedException((Exception) e4);
        } catch (IllegalStateException | KafkaException e5) {
            if (this.transactionEnabled) {
            }
        } catch (Exception e6) {
            throw new KafkaAdapterUnexpectedException(e6);
        }
    }

    @Override // io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient
    public void close() {
        try {
            if (this.producer != null) {
                if (this.transactionEnabled) {
                    this.producer.commitTransaction();
                }
                this.producer.close();
            }
            txnBatchTrackingCntTL.remove();
        } catch (IllegalStateException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }

    static {
        $assertionsDisabled = !OpTimeTrackKafkaProducer.class.desiredAssertionStatus();
        logger = LogManager.getLogger("OpTimeTrackKafkaProducer");
        txnBatchTrackingCntTL = ThreadLocal.withInitial(() -> {
            return 0;
        });
        txnProcResultTL = ThreadLocal.withInitial(() -> {
            return TxnProcResult.SUCCESS;
        });
    }
}
