package io.github.opensabe.spring.boot.starter.rocketmq;

import io.github.opensabe.common.entity.base.vo.BaseMQMessage;
import io.github.opensabe.common.observation.UnifiedObservationFactory;
import io.github.opensabe.common.utils.json.JsonUtil;
import io.github.opensabe.spring.boot.starter.rocketmq.jfr.MessageConsume;
import io.micrometer.observation.Observation;
import io.micrometer.tracing.TraceContext;
import jakarta.annotation.Nonnull;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.Environment;

/* loaded from: input_file:io/github/opensabe/spring/boot/starter/rocketmq/AbstractMQConsumer.class */
public abstract class AbstractMQConsumer implements RocketMQListener<String>, ApplicationListener<ApplicationReadyEvent> {
    private static final Logger log = LogManager.getLogger(AbstractMQConsumer.class);

    @Autowired
    private UnifiedObservationFactory unifiedObservationFactory;

    @Autowired
    protected Environment environment;
    protected String topic;
    private final CountDownLatch cdl = new CountDownLatch(1);
    private volatile boolean isStarted = false;

    @PostConstruct
    public void init() {
        this.topic = this.environment.resolvePlaceholders(getClass().getAnnotation(RocketMQMessageListener.class).topic());
    }

    public void onMessage(String str) {
        if (!this.isStarted) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                log.info("AbstractMQConsumer-onMessage await for ApplicationReadyEvent...");
                this.cdl.await();
                log.info("AbstractMQConsumer-onMessage await complete in {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (Throwable th) {
                log.error("MQ consume countDownLatch interrupted!", th);
            }
        }
        BaseMQMessage decode = MQMessageUtil.decode((BaseMQMessage) JsonUtil.parseObject(str, BaseMQMessage.class));
        Observation createEmptyObservation = this.unifiedObservationFactory.createEmptyObservation();
        TraceContext traceContext = UnifiedObservationFactory.getTraceContext(createEmptyObservation);
        if (StringUtils.isBlank(decode.getData())) {
            MessageConsume messageConsume = new MessageConsume(traceContext.traceId(), traceContext.traceId(), traceContext.spanId(), this.topic);
            messageConsume.begin();
            createEmptyObservation.observe(() -> {
                try {
                    BaseMQMessage baseMQMessage = new BaseMQMessage();
                    baseMQMessage.setData(str);
                    log.info("AbstractMQConsumer-onMessage: topic: {} -> message: {}", this.topic, str);
                    try {
                        onBaseMQMessage(baseMQMessage);
                        messageConsume.setSuccessful(true);
                    } finally {
                    }
                } finally {
                    messageConsume.commit();
                }
            });
        } else {
            MessageConsume messageConsume2 = new MessageConsume(decode.getTraceId(), traceContext.traceId(), traceContext.spanId(), this.topic);
            messageConsume2.begin();
            createEmptyObservation.observe(() -> {
                try {
                    log.info("AbstractMQConsumer-onMessage: topic: initial trace id {}, topic: {} -> message: {}", decode.getTraceId(), this.topic, str);
                    try {
                        onBaseMQMessage(decode);
                        messageConsume2.setSuccessful(true);
                    } finally {
                    }
                } finally {
                    messageConsume2.commit();
                }
            });
        }
    }

    public void onApplicationEvent(@Nonnull ApplicationReadyEvent applicationReadyEvent) {
        this.cdl.countDown();
        this.isStarted = true;
    }

    protected abstract void onBaseMQMessage(BaseMQMessage baseMQMessage);
}
