package com.github.seaframework.monitor.message.disruptor;

import com.github.seaframework.core.config.ConfigurationFactory;
import com.github.seaframework.monitor.common.MonitorConst;
import com.github.seaframework.monitor.dto.MetricDTO;
import com.github.seaframework.monitor.message.MessageProducer;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/sea-monitor-1.1.0.jar:com/github/seaframework/monitor/message/disruptor/DefaultMessageProducer.class */
public class DefaultMessageProducer implements MessageProducer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultMessageProducer.class);
    private Disruptor<MessageEvent> disruptor;
    private RingBuffer<MessageEvent> ringBuffer;
    private volatile boolean init = false;
    private int DEFAULT_BUFFER_SIZE = 4096;

    @Override // com.github.seaframework.monitor.message.MessageProducer
    public void init() {
        if (this.init) {
            return;
        }
        int i = ConfigurationFactory.getInstance().getInt(MonitorConst.CONFIG_KEY_CONSUMER_COUNT, 1);
        this.disruptor = new Disruptor<>(new MessageEventFactory(), this.DEFAULT_BUFFER_SIZE, runnable -> {
            return new Thread(runnable, "sea-monitor-disruptor");
        }, ProducerType.MULTI, new LiteTimeoutBlockingWaitStrategy(500L, TimeUnit.MILLISECONDS));
        WorkHandler<MessageEvent>[] workHandlerArr = new WorkHandler[i];
        for (int i2 = 0; i2 < i; i2++) {
            workHandlerArr[i2] = new MessageConsumerHandler();
        }
        this.disruptor.handleEventsWithWorkerPool(workHandlerArr);
        this.disruptor.setDefaultExceptionHandler(new ExceptionHandler<MessageEvent>() { // from class: com.github.seaframework.monitor.message.disruptor.DefaultMessageProducer.1
            @Override // com.lmax.disruptor.ExceptionHandler
            public void handleEventException(Throwable th, long j, MessageEvent messageEvent) {
                DefaultMessageProducer.log.error("event exception =>,{},{},{}", th, Long.valueOf(j), messageEvent);
            }

            @Override // com.lmax.disruptor.ExceptionHandler
            public void handleOnStartException(Throwable th) {
                DefaultMessageProducer.log.error("on start exception", th);
            }

            @Override // com.lmax.disruptor.ExceptionHandler
            public void handleOnShutdownException(Throwable th) {
                DefaultMessageProducer.log.error("on shutdown exception", th);
            }
        });
        this.disruptor.start();
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.init = true;
    }

    private void checkInit() {
        init();
    }

    @Override // com.github.seaframework.monitor.message.MessageProducer
    public void push(MetricDTO metricDTO) {
        try {
            checkInit();
            long next = this.ringBuffer.next();
            try {
                this.ringBuffer.get(next).setMetricDTO(metricDTO);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        } catch (Exception e) {
            log.error("fail to push metric", (Throwable) e);
        }
    }

    @Override // com.github.seaframework.monitor.message.MessageProducer
    public void shutdown() {
        this.disruptor.shutdown();
        this.init = false;
    }
}
