package io.github.guoshiqiufeng.loki.support.kafka.utils;

import io.github.guoshiqiufeng.loki.support.core.consumer.ConsumerRecord;
import io.github.guoshiqiufeng.loki.support.core.pipeline.PipelineUtils;
import io.github.guoshiqiufeng.loki.support.kafka.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/guoshiqiufeng/loki/support/kafka/utils/KafkaConsumeUtils.class */
public final class KafkaConsumeUtils {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumeUtils.class);

    public static void consumeMessageForPattern(KafkaConsumer<String, String> kafkaConsumer, String str, String str2, Function<ConsumerRecord, Void> function) {
        try {
            try {
                kafkaConsumer.subscribe(Pattern.compile(str));
                while (true) {
                    kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE)).forEach(consumerRecord -> {
                        ConsumerRecord processListener;
                        KafkaConsumerRecord kafkaConsumerRecord = new KafkaConsumerRecord(consumerRecord);
                        if (str2 == null || str2.isEmpty() || "*".equals(str2)) {
                            ConsumerRecord processListener2 = PipelineUtils.processListener(covertConsumerRecord(kafkaConsumerRecord));
                            if (processListener2 != null) {
                                function.apply(processListener2);
                                return;
                            }
                            return;
                        }
                        if (!str2.equals(kafkaConsumerRecord.tag()) || (processListener = PipelineUtils.processListener(covertConsumerRecord(kafkaConsumerRecord))) == null) {
                            return;
                        }
                        function.apply(processListener);
                    });
                }
            } catch (WakeupException e) {
                kafkaConsumer.close();
            } catch (Exception e2) {
                if (log.isErrorEnabled()) {
                    log.error("Unexpected error", e2);
                }
                kafkaConsumer.close();
            }
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }

    public static void consumeMessage(KafkaConsumer<String, String> kafkaConsumer, String str, String str2, Function<ConsumerRecord, Void> function) {
        try {
            try {
                kafkaConsumer.subscribe(Collections.singletonList(str));
                while (true) {
                    kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE)).forEach(consumerRecord -> {
                        ConsumerRecord processListener;
                        KafkaConsumerRecord kafkaConsumerRecord = new KafkaConsumerRecord(consumerRecord);
                        if (str2 == null || str2.isEmpty() || "*".equals(str2)) {
                            ConsumerRecord processListener2 = PipelineUtils.processListener(covertConsumerRecord(kafkaConsumerRecord));
                            if (processListener2 != null) {
                                function.apply(processListener2);
                                return;
                            }
                            return;
                        }
                        if (!str2.equals(kafkaConsumerRecord.tag()) || (processListener = PipelineUtils.processListener(covertConsumerRecord(kafkaConsumerRecord))) == null) {
                            return;
                        }
                        function.apply(processListener);
                    });
                }
            } catch (WakeupException e) {
                kafkaConsumer.close();
            } catch (Exception e2) {
                if (log.isErrorEnabled()) {
                    log.error("Unexpected error", e2);
                }
                kafkaConsumer.close();
            }
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }

    private static ConsumerRecord covertConsumerRecord(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
        return new ConsumerRecord(kafkaConsumerRecord.topic(), kafkaConsumerRecord.tag(), getMsgId(kafkaConsumerRecord), (String) null, Collections.singletonList((String) kafkaConsumerRecord.key()), (String) kafkaConsumerRecord.value());
    }

    private static String getMsgId(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
        return kafkaConsumerRecord.partition() + "_" + kafkaConsumerRecord.offset();
    }

    private KafkaConsumeUtils() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }
}
