package com.github.seaframework.monitor.message.disruptor;

import cn.hutool.core.util.RandomUtil;
import com.github.seaframework.monitor.dto.MetricDTO;
import com.github.seaframework.monitor.message.simple.SimpleConsumerTask;
import com.google.common.collect.Queues;
import com.lmax.disruptor.WorkHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/sea-monitor-1.0.0.jar:com/github/seaframework/monitor/message/disruptor/MessageConsumerHandler.class */
public class MessageConsumerHandler implements WorkHandler<MessageEvent> {
    private boolean init = false;
    private int DEFAULT_QUEUE_SIZE = 5000;
    private BlockingQueue<MetricDTO> queue;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageConsumerHandler.class);
    private static final AtomicLong count = new AtomicLong(0);

    @Override // com.lmax.disruptor.WorkHandler
    public void onEvent(MessageEvent messageEvent) throws Exception {
        if (!this.init) {
            initQueue();
        }
        this.queue.put(messageEvent.getMetricDTO());
    }

    private void initQueue() {
        if (this.queue == null) {
            this.queue = Queues.newArrayBlockingQueue(this.DEFAULT_QUEUE_SIZE);
        }
        try {
            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(100, 500));
        } catch (Exception e) {
            log.error("--", (Throwable) e);
        }
        SimpleConsumerTask simpleConsumerTask = new SimpleConsumerTask(this.queue);
        Thread thread = new Thread(simpleConsumerTask);
        thread.setName("sea-monitor-message-consumer-" + count.incrementAndGet());
        thread.start();
        Thread thread2 = new Thread(() -> {
            simpleConsumerTask.stop();
        });
        thread2.setDaemon(true);
        Runtime.getRuntime().addShutdownHook(thread2);
        this.init = true;
    }
}
