package com.jeesuite.kafka.producer.handler;

import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.thread.StandardThreadExecutor;
import java.io.IOException;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/producer/handler/SendErrorDelayRetryHandler.class */
public class SendErrorDelayRetryHandler implements ProducerEventHandler {
    private static final Logger logger = LoggerFactory.getLogger(SendErrorDelayRetryHandler.class);
    private final PriorityBlockingQueue<PriorityTask> taskQueue = new PriorityBlockingQueue<>(1000);
    private List<String> messageIdsInQueue = new Vector();
    private ExecutorService executor = Executors.newFixedThreadPool(1, new StandardThreadExecutor.StandardThreadFactory("ErrorMessageProcessor"));
    private KafkaProducer<String, Object> topicProducer;
    private int retries;

    /* loaded from: input_file:com/jeesuite/kafka/producer/handler/SendErrorDelayRetryHandler$PriorityTask.class */
    class PriorityTask implements Runnable, Comparable<PriorityTask> {
        final String topicName;
        final DefaultMessage message;
        int retryCount;
        long nextFireTime;

        public PriorityTask(SendErrorDelayRetryHandler sendErrorDelayRetryHandler, String str, DefaultMessage defaultMessage) {
            this(str, defaultMessage, System.currentTimeMillis());
        }

        public PriorityTask(String str, DefaultMessage defaultMessage, long j) {
            this.retryCount = 0;
            this.topicName = str;
            this.message = defaultMessage;
            this.nextFireTime = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                SendErrorDelayRetryHandler.logger.debug("begin re process message:" + toString());
                SendErrorDelayRetryHandler.this.topicProducer.send(new ProducerRecord(this.topicName, this.message.getMsgId(), this.message));
                SendErrorDelayRetryHandler.this.messageIdsInQueue.remove(this.message.getMsgId());
            } catch (Exception e) {
                SendErrorDelayRetryHandler.logger.warn("retry mssageId[{}] error", this.message.getMsgId(), e);
                retry();
            }
        }

        private void retry() {
            if (this.retryCount == SendErrorDelayRetryHandler.this.retries) {
                return;
            }
            this.nextFireTime += this.retryCount * 30 * 1000;
            SendErrorDelayRetryHandler.this.taskQueue.add(this);
            SendErrorDelayRetryHandler.logger.debug("re submit mssageId[{}] task to queue,next fireTime:", this.message.getMsgId(), Long.valueOf(this.nextFireTime));
            this.retryCount++;
        }

        @Override // java.lang.Comparable
        public int compareTo(PriorityTask priorityTask) {
            return (int) (this.nextFireTime - priorityTask.nextFireTime);
        }

        public String toString() {
            return "PriorityTask [message=" + this.message.getMsgId() + ", retryCount=" + this.retryCount + ", nextFireTime=" + this.nextFireTime + "]";
        }
    }

    public SendErrorDelayRetryHandler(String str, KafkaProducer<String, Object> kafkaProducer, int i) {
        this.retries = 0;
        this.topicProducer = kafkaProducer;
        this.retries = i;
        this.executor.submit(new Runnable() { // from class: com.jeesuite.kafka.producer.handler.SendErrorDelayRetryHandler.1
            @Override // java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    try {
                        PriorityTask priorityTask = (PriorityTask) SendErrorDelayRetryHandler.this.taskQueue.take();
                        if (priorityTask.nextFireTime < currentTimeMillis) {
                            TimeUnit.MILLISECONDS.sleep(100L);
                        } else {
                            priorityTask.run();
                        }
                    } catch (Exception e) {
                    }
                }
            }
        });
    }

    @Override // com.jeesuite.kafka.producer.handler.ProducerEventHandler
    public void onSuccessed(String str, RecordMetadata recordMetadata) {
    }

    @Override // com.jeesuite.kafka.producer.handler.ProducerEventHandler
    public void onError(String str, DefaultMessage defaultMessage, boolean z) {
        if (z && !this.messageIdsInQueue.contains(defaultMessage.getMsgId())) {
            this.taskQueue.add(new PriorityTask(this, str, defaultMessage));
            this.messageIdsInQueue.add(defaultMessage.getMsgId());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.executor.shutdownNow();
    }
}
