package com.ken.event.action.mq.kafka.interceptor;

import com.ken.event.standard.entity.KenMessage;
import com.ken.event.standard.inter.CoreProducerStandard;
import com.ken.event.standard.interceptor.KenProducerInterceptor;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/ken/event/action/mq/kafka/interceptor/DelayMessageProducerInterceptor.class */
public class DelayMessageProducerInterceptor implements KenProducerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(DelayMessageProducerInterceptor.class);
    private DelayQueue<MyDelayTask> delayQueue = new DelayQueue<>();

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private CoreProducerStandard coreProducerStandard;

    /* loaded from: input_file:com/ken/event/action/mq/kafka/interceptor/DelayMessageProducerInterceptor$MyDelayTask.class */
    private static class MyDelayTask implements Delayed {
        private KenMessage kenMessage;
        private long ttl;

        public MyDelayTask(KenMessage kenMessage) {
            this.kenMessage = kenMessage;
            this.ttl = System.currentTimeMillis() + kenMessage.getDelayTime().longValue();
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return this.ttl - System.currentTimeMillis();
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return (int) (this.ttl - ((MyDelayTask) delayed).getTtl());
        }

        public KenMessage getKenMessage() {
            return this.kenMessage;
        }

        public long getTtl() {
            return this.ttl;
        }

        public void setKenMessage(KenMessage kenMessage) {
            this.kenMessage = kenMessage;
        }

        public void setTtl(long j) {
            this.ttl = j;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof MyDelayTask)) {
                return false;
            }
            MyDelayTask myDelayTask = (MyDelayTask) obj;
            if (!myDelayTask.canEqual(this)) {
                return false;
            }
            KenMessage kenMessage = getKenMessage();
            KenMessage kenMessage2 = myDelayTask.getKenMessage();
            if (kenMessage == null) {
                if (kenMessage2 != null) {
                    return false;
                }
            } else if (!kenMessage.equals(kenMessage2)) {
                return false;
            }
            return getTtl() == myDelayTask.getTtl();
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof MyDelayTask;
        }

        public int hashCode() {
            KenMessage kenMessage = getKenMessage();
            int hashCode = (1 * 59) + (kenMessage == null ? 43 : kenMessage.hashCode());
            long ttl = getTtl();
            return (hashCode * 59) + ((int) ((ttl >>> 32) ^ ttl));
        }

        public String toString() {
            return "DelayMessageProducerInterceptor.MyDelayTask(kenMessage=" + getKenMessage() + ", ttl=" + getTtl() + ")";
        }
    }

    @PostConstruct
    public void init() {
        log.info("[delay queue]延迟消息拦截器加载....");
        log.info("[delay queue]获取Spring默认线程池，处理延迟队列... {}", this.threadPoolTaskExecutor);
        this.threadPoolTaskExecutor.submit(() -> {
            while (true) {
                try {
                    MyDelayTask take = this.delayQueue.take();
                    log.debug("[delay queue] 获取到延迟任务并处理... {}", take);
                    KenMessage kenMessage = take.getKenMessage();
                    kenMessage.addAttr("delay", true);
                    this.coreProducerStandard.sendMessage(kenMessage);
                } catch (InterruptedException e) {
                    log.error("[delay queue] 处理延迟任务异常！", e);
                }
            }
        });
    }

    public boolean isSupport(KenMessage kenMessage) {
        return kenMessage.getMsgType().intValue() == 2 && !kenMessage.getAttr().containsKey("delay");
    }

    public KenMessage interceptor(KenMessage kenMessage) {
        log.info("[delay queue]经过了延迟拦截器处理....");
        if (kenMessage.getDelayTime().longValue() <= 0) {
            return kenMessage;
        }
        this.delayQueue.add((DelayQueue<MyDelayTask>) new MyDelayTask(kenMessage));
        return null;
    }
}
