package io.nosqlbench.adapter.kafka.ops;

import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.class */
public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
    private static final Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer");
    private final int msgPoolIntervalInMs;
    private final boolean asyncMsgCommit;
    private final boolean autoCommitEnabled;
    private final int maxMsgCntPerCommit;
    private final ThreadLocal<Integer> manualCommitTrackingCnt;
    private final KafkaConsumer<String, String> consumer;

    public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace, boolean z, int i, boolean z2, int i2, KafkaConsumer<String, String> kafkaConsumer) {
        super(kafkaSpace);
        this.manualCommitTrackingCnt = ThreadLocal.withInitial(() -> {
            return 0;
        });
        this.msgPoolIntervalInMs = i;
        this.asyncMsgCommit = z;
        this.autoCommitEnabled = z2;
        this.maxMsgCntPerCommit = i2;
        this.consumer = kafkaConsumer;
    }

    public int getManualCommitTrackingCnt() {
        return this.manualCommitTrackingCnt.get().intValue();
    }

    public void incManualCommitTrackingCnt() {
        this.manualCommitTrackingCnt.set(Integer.valueOf(getManualCommitTrackingCnt() + 1));
    }

    public void resetManualCommitTrackingCnt() {
        this.manualCommitTrackingCnt.set(0);
    }

    private boolean msgCommitNeeded(long j) {
        boolean z = !this.autoCommitEnabled;
        if (z) {
            int intValue = this.manualCommitTrackingCnt.get().intValue();
            if ((intValue <= 0 || intValue % this.maxMsgCntPerCommit != 0) && j < this.kafkaSpace.getTotalCycleNum() - 1) {
                z = false;
            } else {
                z = true;
                if (logger.isDebugEnabled()) {
                    logger.debug("Manually commit message ({}, {}, {})", this.manualCommitTrackingCnt, Integer.valueOf(intValue), Long.valueOf(j));
                }
            }
        }
        return z;
    }

    private String printRecvedMsg(ConsumerRecord<String, String> consumerRecord) {
        Header lastHeader = consumerRecord.headers().lastHeader(KafkaAdapterUtil.NB_MSG_SEQ_PROP);
        StringBuilder sb = new StringBuilder();
        if (lastHeader != null) {
            sb.append("Header (MsgSeq): " + new String(lastHeader.value()) + "; ");
        }
        sb.append("Key: " + ((String) consumerRecord.key()) + "; ");
        sb.append("Value: " + ((String) consumerRecord.value()) + "; ");
        return sb.toString();
    }

    @Override // io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient
    void cycleMsgProcess(final long j, Object obj) {
        if (this.kafkaSpace.isShuttigDown()) {
            return;
        }
        synchronized (this) {
            Iterator it = this.consumer.poll(this.msgPoolIntervalInMs).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it.next();
                if (consumerRecord != null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Receiving message is successful: [{}] - offset({}), cycle ({})", printRecvedMsg(consumerRecord), Long.valueOf(consumerRecord.offset()), Long.valueOf(j));
                    }
                    if (!this.autoCommitEnabled) {
                        if (msgCommitNeeded(j)) {
                            if (this.asyncMsgCommit) {
                                this.consumer.commitAsync(new OffsetCommitCallback() { // from class: io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer.1
                                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                                        if (OpTimeTrackKafkaConsumer.logger.isDebugEnabled()) {
                                            if (exc == null) {
                                                OpTimeTrackKafkaConsumer.logger.debug("Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})", Long.valueOf(j), Integer.valueOf(OpTimeTrackKafkaConsumer.this.maxMsgCntPerCommit));
                                            } else {
                                                OpTimeTrackKafkaConsumer.logger.debug("Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})", Long.valueOf(j), Integer.valueOf(OpTimeTrackKafkaConsumer.this.maxMsgCntPerCommit), exc.getMessage());
                                            }
                                        }
                                    }
                                });
                            } else {
                                this.consumer.commitSync();
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})", Long.valueOf(j), Integer.valueOf(this.maxMsgCntPerCommit));
                                }
                            }
                            resetManualCommitTrackingCnt();
                        } else {
                            incManualCommitTrackingCnt();
                        }
                    }
                }
            }
        }
    }

    @Override // io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient
    public void close() {
        try {
            if (this.consumer != null) {
                if (this.asyncMsgCommit) {
                    this.consumer.commitAsync();
                } else {
                    this.consumer.commitSync();
                }
                this.consumer.close();
            }
            this.manualCommitTrackingCnt.remove();
        } catch (IllegalStateException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }
}
