package io.github.icodegarden.commons.kafka.reliability;

import io.github.icodegarden.commons.kafka.DelayedRetryableReference;
import io.github.icodegarden.commons.lang.concurrent.NamedThreadFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/icodegarden/commons/kafka/reliability/PausePollReliabilityProcessor.class */
public class PausePollReliabilityProcessor<K, V> extends AbstractReliabilityProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(PausePollReliabilityProcessor.class);
    private ThreadPoolExecutor handleFailedRecordsThreadPool;
    private Queue<DelayedRetryableReference<Collection<TopicPartition>>> pausedTopicPartitionsQueue;
    private Queue<Map<TopicPartition, OffsetAndMetadata>> toCommitOffsetsQueue;

    public PausePollReliabilityProcessor(KafkaConsumer<K, V> kafkaConsumer, ReliabilityHandler<K, V> reliabilityHandler, Properties properties) {
        super(kafkaConsumer, reliabilityHandler, properties);
        this.handleFailedRecordsThreadPool = new ThreadPoolExecutor(0, Math.max(5, Runtime.getRuntime().availableProcessors() + 1), 600000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(16), new NamedThreadFactory("io.kafka.failedRecords.handle.threadpool"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.pausedTopicPartitionsQueue = new DelayQueue();
        this.toCommitOffsetsQueue = new LinkedList();
    }

    @Override // io.github.icodegarden.commons.kafka.reliability.AbstractReliabilityProcessor, io.github.icodegarden.commons.kafka.reliability.ReliabilityProcessor
    public void handleReliability(ConsumerRecords<K, V> consumerRecords) {
        if (!consumerRecords.isEmpty()) {
            int count = consumerRecords.count();
            this.processingCount.addAndGet(count);
            List synchronizedList = Collections.synchronizedList(new LinkedList());
            CountDownLatch countDownLatch = new CountDownLatch(count);
            consumerRecords.forEach(consumerRecord -> {
                try {
                    this.handleRecordsThreadPool.execute(() -> {
                        try {
                            try {
                                if (!this.recordReliabilityHandler.handle(consumerRecord)) {
                                    synchronizedList.add(consumerRecord);
                                }
                            } catch (Exception e) {
                                synchronizedList.add(consumerRecord);
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    });
                } catch (Exception e) {
                    synchronizedList.add(consumerRecord);
                    countDownLatch.countDown();
                    log.error("ex on execution", e);
                }
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
            }
            HashMap hashMap = new HashMap(16);
            consumerRecords.forEach(consumerRecord2 -> {
                hashMap.put(new TopicPartition(consumerRecord2.topic(), consumerRecord2.partition()), new OffsetAndMetadata(consumerRecord2.offset() + 1));
            });
            if (synchronizedList.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("failed records is empty , commit directly");
                }
                try {
                    this.consumer.commitAsync(hashMap, OFFSETCOMMIT_CALLBACK);
                } catch (Exception e2) {
                } catch (Throwable th) {
                    this.processingCount.addAndGet(-count);
                    throw th;
                }
                this.processingCount.addAndGet(-count);
            } else {
                Set<TopicPartition> resolveTopicPartitions = resolveTopicPartitions(synchronizedList);
                try {
                    this.consumer.pause(resolveTopicPartitions);
                    if (log.isDebugEnabled()) {
                        log.debug("pause success...,topicPartitions:{}", resolveTopicPartitions);
                    }
                    this.handleFailedRecordsThreadPool.execute(() -> {
                        super.handleReliability((Collection) synchronizedList, true);
                        this.toCommitOffsetsQueue.offer(hashMap);
                        new DelayedRetryableReference(resolveTopicPartitions, this.pausedTopicPartitionsQueue, 0L).enQueue();
                        this.processingCount.addAndGet(-count);
                        if (log.isDebugEnabled()) {
                            log.debug("toCommit & paused offered...");
                        }
                    });
                } catch (Exception e3) {
                    log.error("pause error,toPauseTopicPartitions:{}", resolveTopicPartitions, e3);
                    super.handleReliability((Collection) synchronizedList, true);
                    try {
                        this.consumer.commitAsync(hashMap, OFFSETCOMMIT_CALLBACK);
                    } catch (Exception e4) {
                    } catch (Throwable th2) {
                        this.processingCount.addAndGet(-count);
                        throw th2;
                    }
                    this.processingCount.addAndGet(-count);
                }
            }
        }
        commitQueuedOffsets(map -> {
            this.consumer.commitAsync(map, OFFSETCOMMIT_CALLBACK);
        });
        resumeQueuedTopicPartitions();
    }

    private void commitQueuedOffsets(Consumer<Map<TopicPartition, OffsetAndMetadata>> consumer) {
        while (true) {
            Map<TopicPartition, OffsetAndMetadata> poll = this.toCommitOffsetsQueue.poll();
            if (poll == null) {
                return;
            }
            try {
                consumer.accept(poll);
            } catch (Exception e) {
                log.error("commit offsets error,commitOffsets:{}", poll, e);
            }
        }
    }

    private void resumeQueuedTopicPartitions() {
        while (true) {
            DelayedRetryableReference<Collection<TopicPartition>> poll = this.pausedTopicPartitionsQueue.poll();
            if (poll == null) {
                return;
            }
            Collection<TopicPartition> collection = poll.get();
            try {
                this.consumer.resume(collection);
                log.debug("resume batch success...,topicPartitions:{}", collection);
            } catch (Exception e) {
                log.error("resume batch error,topicPartitions:{}", collection, e);
                if (collection.size() != 1 || poll.getDelayMillis() <= 0) {
                    collection.forEach(topicPartition -> {
                        List asList = Arrays.asList(topicPartition);
                        try {
                            this.consumer.resume(asList);
                            log.debug("resume one success...,topicPartition:{}", topicPartition);
                        } catch (Exception e2) {
                            log.error("resume one error,topicPartition:{}", topicPartition, e2);
                            new DelayedRetryableReference(asList, this.pausedTopicPartitionsQueue).enQueue();
                        }
                    });
                } else {
                    poll.enQueue();
                }
            }
        }
    }

    public void close() throws IOException {
        close(Duration.ofMillis(ReliabilityConsumer.DEFAULT_CLOSE_TIMEOUT_MS));
    }

    public void close(long j) throws IOException {
        log.info("start close {} named {} ", ReliabilityProcessor.class.getSimpleName(), this.processorName);
        try {
            waitProcessingComplete(j);
            this.handleRecordsThreadPool.shutdown();
            this.handleFailedRecordsThreadPool.shutdown();
            log.info("commitSync offsets...");
            commitQueuedOffsets(map -> {
                try {
                    this.consumer.commitSync(map, Duration.ofMillis(j));
                    log.info("commitSync offsets before close success,commitOffsets:{}", map);
                } catch (CommitFailedException e) {
                    log.error("commitSync offsets before close error,commitOffsets:{}", map, e);
                }
            });
            this.toCommitOffsetsQueue.clear();
            this.pausedTopicPartitionsQueue.clear();
            this.consumer.close(Duration.ofMillis(j));
            log.info("{} named {} closed ...", ReliabilityProcessor.class.getSimpleName(), this.processorName);
        } catch (Throwable th) {
            log.info("commitSync offsets...");
            commitQueuedOffsets(map2 -> {
                try {
                    this.consumer.commitSync(map2, Duration.ofMillis(j));
                    log.info("commitSync offsets before close success,commitOffsets:{}", map2);
                } catch (CommitFailedException e) {
                    log.error("commitSync offsets before close error,commitOffsets:{}", map2, e);
                }
            });
            this.toCommitOffsetsQueue.clear();
            this.pausedTopicPartitionsQueue.clear();
            this.consumer.close(Duration.ofMillis(j));
            log.info("{} named {} closed ...", ReliabilityProcessor.class.getSimpleName(), this.processorName);
            throw th;
        }
    }
}
