package com.jeesuite.kafka.consumer;

import com.jeesuite.common.json.JsonUtils;
import com.jeesuite.kafka.consumer.hanlder.RetryErrorMessageHandler;
import com.jeesuite.kafka.handler.MessageHandler;
import com.jeesuite.kafka.message.DefaultMessage;
import com.jeesuite.kafka.thread.StandardThreadExecutor;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jeesuite/kafka/consumer/ErrorMessageProcessor.class */
public class ErrorMessageProcessor implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(ErrorMessageProcessor.class);
    private long retryPeriodUnit;
    private int maxReties;
    private RetryErrorMessageHandler retryErrorHandler;
    private ExecutorService executor;
    private final PriorityBlockingQueue<PriorityTask> taskQueue = new PriorityBlockingQueue<>(1000);
    private AtomicBoolean closed = new AtomicBoolean(false);

    /* loaded from: input_file:com/jeesuite/kafka/consumer/ErrorMessageProcessor$PriorityTask.class */
    class PriorityTask implements Runnable, Comparable<PriorityTask> {
        final DefaultMessage message;
        final MessageHandler messageHandler;
        int retryCount;
        long nextFireTime;

        public PriorityTask(ErrorMessageProcessor errorMessageProcessor, DefaultMessage defaultMessage, MessageHandler messageHandler) {
            this(defaultMessage, messageHandler, System.currentTimeMillis() + errorMessageProcessor.retryPeriodUnit);
        }

        public PriorityTask(DefaultMessage defaultMessage, MessageHandler messageHandler, long j) {
            this.retryCount = 0;
            this.message = defaultMessage;
            this.messageHandler = messageHandler;
            this.nextFireTime = j;
        }

        public DefaultMessage getMessage() {
            return this.message;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ErrorMessageProcessor.logger.debug("begin re-process message:" + this.message.getMsgId());
                this.messageHandler.p2Process(this.message);
            } catch (Exception e) {
                this.retryCount++;
                ErrorMessageProcessor.logger.warn("retry[{}] mssageId[{}] error", Integer.valueOf(this.retryCount), this.message.getMsgId());
                retry();
            }
        }

        private void retry() {
            if (this.retryCount != ErrorMessageProcessor.this.maxReties) {
                this.nextFireTime += this.retryCount * ErrorMessageProcessor.this.retryPeriodUnit;
                ErrorMessageProcessor.this.taskQueue.add(this);
                ErrorMessageProcessor.logger.debug("retry_resubmit mssageId[{}] task to queue,next fireTime:{}", this.message.getMsgId(), Long.valueOf(this.nextFireTime));
            } else {
                if (ErrorMessageProcessor.this.retryErrorHandler == null) {
                    ErrorMessageProcessor.logger.warn("retry_skip process message[{}] maxReties over {} time error!!!", JsonUtils.toJson(this.message), Integer.valueOf(ErrorMessageProcessor.this.maxReties));
                    return;
                }
                try {
                    ErrorMessageProcessor.this.retryErrorHandler.process(ConsumerContext.getInstance().getGroupId(), this.message.topic(), this.message);
                } catch (Exception e) {
                    ErrorMessageProcessor.logger.warn("persistHandler error,topic[" + this.message.topic() + "]", e);
                }
            }
        }

        @Override // java.lang.Comparable
        public int compareTo(PriorityTask priorityTask) {
            return (int) (this.nextFireTime - priorityTask.nextFireTime);
        }

        public String toString() {
            return "PriorityTask [message=" + this.message.getMsgId() + ", messageHandler=" + this.messageHandler.getClass().getSimpleName() + ", retryCount=" + this.retryCount + ", nextFireTime=" + this.nextFireTime + "]";
        }
    }

    public int getRetryTaskNums() {
        return this.taskQueue.size();
    }

    public ErrorMessageProcessor(int i, int i2, int i3, RetryErrorMessageHandler retryErrorMessageHandler) {
        this.retryPeriodUnit = i2 * 1000;
        this.maxReties = i3;
        this.retryErrorHandler = retryErrorMessageHandler;
        this.executor = Executors.newFixedThreadPool(i, new StandardThreadExecutor.StandardThreadFactory("ErrorMessageProcessor"));
        this.executor.submit(new Runnable() { // from class: com.jeesuite.kafka.consumer.ErrorMessageProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                PriorityTask priorityTask;
                while (!ErrorMessageProcessor.this.closed.get()) {
                    try {
                        priorityTask = (PriorityTask) ErrorMessageProcessor.this.taskQueue.take();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (priorityTask.getMessage() == null) {
                        return;
                    }
                    if (priorityTask.nextFireTime - System.currentTimeMillis() > 0) {
                        TimeUnit.MILLISECONDS.sleep(1000L);
                        ErrorMessageProcessor.this.taskQueue.put(priorityTask);
                    } else {
                        priorityTask.run();
                    }
                }
            }
        });
    }

    public void submit(DefaultMessage defaultMessage, MessageHandler messageHandler) {
        int size = this.taskQueue.size();
        if (size > 1000) {
            logger.warn("ErrorMessageProcessor queue task count over:{}", Integer.valueOf(size));
        }
        this.taskQueue.add(new PriorityTask(this, defaultMessage, messageHandler));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed.set(true);
        this.taskQueue.add(new PriorityTask(this, null, null));
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
        }
        this.executor.shutdown();
        logger.info("ErrorMessageDefaultProcessor closed");
    }
}
