package io.github.icodegarden.commons.kafka.reliability;

import io.github.icodegarden.commons.kafka.RetryableExecutor;
import io.github.icodegarden.commons.lang.concurrent.NamedThreadFactory;
import io.github.icodegarden.commons.lang.result.Result1;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/icodegarden/commons/kafka/reliability/AbstractReliabilityProcessor.class */
public abstract class AbstractReliabilityProcessor<K, V> implements ReliabilityProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(AbstractReliabilityProcessor.class);
    protected static final OffsetCommitCallback OFFSETCOMMIT_CALLBACK = new OffsetCommitCallback() { // from class: io.github.icodegarden.commons.kafka.reliability.AbstractReliabilityProcessor.1
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc != null) {
                AbstractReliabilityProcessor.log.warn("commit offsets error,offsets:{},these offsets can be cover by next commit", map, exc);
            }
            if (AbstractReliabilityProcessor.log.isDebugEnabled()) {
                AbstractReliabilityProcessor.log.debug("offsets commit callback success:{}", Boolean.valueOf(exc == null));
            }
        }
    };
    private static final RetryableExecutor<Boolean> RETRYABLE_EXECUTOR = new RetryableExecutor<>();
    protected KafkaConsumer<K, V> consumer;
    protected ReliabilityHandler<K, V> recordReliabilityHandler;
    protected ThreadPoolExecutor handleRecordsThreadPool;
    protected String processorName = getClass().getSimpleName();
    protected AtomicLong processingCount = new AtomicLong(0);

    /* loaded from: input_file:io/github/icodegarden/commons/kafka/reliability/AbstractReliabilityProcessor$CallerRunsExecutor.class */
    private class CallerRunsExecutor extends ThreadPoolExecutor {
        public CallerRunsExecutor() {
            super(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue());
        }

        @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            runnable.run();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public AbstractReliabilityProcessor(KafkaConsumer<K, V> kafkaConsumer, ReliabilityHandler<K, V> reliabilityHandler, Properties properties) {
        this.consumer = kafkaConsumer;
        this.recordReliabilityHandler = reliabilityHandler;
        int intValue = ((Integer) properties.getOrDefault(PropertiesConstants.HANDLERECORDS_THREADPOOL_CORESIZE.getT1(), PropertiesConstants.HANDLERECORDS_THREADPOOL_CORESIZE.getT2())).intValue();
        int intValue2 = ((Integer) properties.getOrDefault(PropertiesConstants.HANDLERECORDS_THREADPOOL_MAXSIZE.getT1(), PropertiesConstants.HANDLERECORDS_THREADPOOL_MAXSIZE.getT2())).intValue();
        long longValue = ((Long) properties.getOrDefault(PropertiesConstants.HANDLERECORDS_THREADPOOL_KEEPALIVEMILLIS.getT1(), PropertiesConstants.HANDLERECORDS_THREADPOOL_KEEPALIVEMILLIS.getT2())).longValue();
        int intValue3 = ((Integer) properties.getOrDefault(PropertiesConstants.HANDLERECORDS_THREADPOOL_QUEUESIZE.getT1(), PropertiesConstants.HANDLERECORDS_THREADPOOL_QUEUESIZE.getT2())).intValue();
        String str = (String) properties.getOrDefault(PropertiesConstants.HANDLERECORDS_THREADPOOL_NAMEPREFIX.getT1(), PropertiesConstants.HANDLERECORDS_THREADPOOL_NAMEPREFIX.getT2());
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) properties.getOrDefault(PropertiesConstants.HANDLERECORDS_THREADPOOL_REJECTEDPOLICY.getT1(), PropertiesConstants.HANDLERECORDS_THREADPOOL_REJECTEDPOLICY.getT2());
        if (intValue2 <= 1) {
            this.handleRecordsThreadPool = new CallerRunsExecutor();
        } else {
            this.handleRecordsThreadPool = new ThreadPoolExecutor(intValue, intValue2, longValue, TimeUnit.MILLISECONDS, intValue3 == 0 ? new SynchronousQueue() : new LinkedBlockingQueue(intValue3), new NamedThreadFactory(str), rejectedExecutionHandler);
        }
    }

    @Override // io.github.icodegarden.commons.kafka.reliability.ReliabilityProcessor
    public abstract void handleReliability(ConsumerRecords<K, V> consumerRecords);

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleReliability(ConsumerRecords<K, V> consumerRecords, boolean z) {
        handleReliability(consumerRecords, consumerRecords.count(), z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleReliability(Collection<ConsumerRecord<K, V>> collection, boolean z) {
        handleReliability(collection, collection.size(), z);
    }

    private void handleReliability(Iterable<ConsumerRecord<K, V>> iterable, int i, boolean z) {
        if (i > 0) {
            CountDownLatch countDownLatch = new CountDownLatch(i);
            iterable.forEach(consumerRecord -> {
                try {
                    this.handleRecordsThreadPool.execute(() -> {
                        try {
                            Result1<Exception> execute = RETRYABLE_EXECUTOR.execute(() -> {
                                return Boolean.valueOf(this.recordReliabilityHandler.handle(consumerRecord));
                            }, this.recordReliabilityHandler.handleRetries(), this.recordReliabilityHandler.handleRetryBackoffMillis(), z);
                            if (!execute.isSuccess()) {
                                Exception exc = (Exception) execute.getT1();
                                execute = RETRYABLE_EXECUTOR.execute(() -> {
                                    return Boolean.valueOf(this.recordReliabilityHandler.primaryStore(consumerRecord, exc));
                                }, this.recordReliabilityHandler.storeRetries(), this.recordReliabilityHandler.storeRetryBackoffMillis());
                            }
                            if (!execute.isSuccess()) {
                                Exception exc2 = (Exception) execute.getT1();
                                execute = RETRYABLE_EXECUTOR.execute(() -> {
                                    return Boolean.valueOf(this.recordReliabilityHandler.secondaryStore(consumerRecord, exc2));
                                }, this.recordReliabilityHandler.storeRetries(), this.recordReliabilityHandler.storeRetryBackoffMillis());
                            }
                            if (!execute.isSuccess()) {
                                try {
                                    this.recordReliabilityHandler.onStoreFailed(consumerRecord, (Throwable) execute.getT1());
                                } catch (Exception e) {
                                    log.error("ex on storeFailed", e);
                                }
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    });
                } catch (Exception e) {
                    countDownLatch.countDown();
                    log.error("ex on execution", e);
                }
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // io.github.icodegarden.commons.kafka.reliability.ReliabilityProcessor
    public long processingCount() {
        return this.processingCount.get();
    }

    public void waitProcessingComplete(long j) {
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (processingCount() <= 0 || j3 >= j) {
                break;
            }
            log.info("wait processing records to be complete ,processingCount:{} ,left timeout millis:{}", Long.valueOf(processingCount()), Long.valueOf(j - j3));
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
            j2 = j3 + 100;
        }
        if (processingCount() > 0) {
            log.warn("wait timeout , there are {} records in process", Long.valueOf(processingCount()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<TopicPartition> resolveTopicPartitions(Collection<ConsumerRecord<K, V>> collection) {
        return (Set) collection.stream().map(consumerRecord -> {
            return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        }).collect(Collectors.toSet());
    }

    public void setProcessorName(String str) {
        this.processorName = str;
    }
}
