package io.github.icodegarden.commons.kafka.reliability;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/icodegarden/commons/kafka/reliability/BatchCompletionReliabilityProcessor.class */
public class BatchCompletionReliabilityProcessor<K, V> extends AbstractReliabilityProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(BatchCompletionReliabilityProcessor.class);

    public BatchCompletionReliabilityProcessor(KafkaConsumer<K, V> kafkaConsumer, ReliabilityHandler<K, V> reliabilityHandler, Properties properties) {
        super(kafkaConsumer, reliabilityHandler, properties);
    }

    @Override // io.github.icodegarden.commons.kafka.reliability.AbstractReliabilityProcessor, io.github.icodegarden.commons.kafka.reliability.ReliabilityProcessor
    public void handleReliability(ConsumerRecords<K, V> consumerRecords) {
        if (consumerRecords.isEmpty()) {
            return;
        }
        this.processingCount.addAndGet(consumerRecords.count());
        super.handleReliability((ConsumerRecords) consumerRecords, false);
        try {
            this.consumer.commitAsync(OFFSETCOMMIT_CALLBACK);
        } catch (Exception e) {
        } catch (Throwable th) {
            this.processingCount.addAndGet(-r0);
            throw th;
        }
        this.processingCount.addAndGet(-r0);
    }

    public void close() throws IOException {
        close(Duration.ofMillis(ReliabilityConsumer.DEFAULT_CLOSE_TIMEOUT_MS));
    }

    public void close(long j) throws IOException {
        log.info("start close {} named {} ", ReliabilityProcessor.class.getSimpleName(), this.processorName);
        try {
            waitProcessingComplete(j);
            this.handleRecordsThreadPool.shutdown();
            log.info("commitSync before close ,timeout millis:{}", Long.valueOf(j));
            try {
                this.consumer.commitSync(Duration.ofMillis(j));
                log.info("commitSync offsets before close success");
            } catch (CommitFailedException e) {
                log.error("ex on commitSync offsets before close", e);
            }
        } finally {
            this.consumer.close(Duration.ofMillis(j));
            log.info("{} named {} closed ...", ReliabilityProcessor.class.getSimpleName(), this.processorName);
        }
    }
}
