package com.dxy.library.springboot.kafka.rest;

import com.dxy.common.util.ExecutorUtil;
import com.dxy.library.springboot.kafka.constant.KafkaTopic;
import com.dxy.library.springboot.kafka.producer.KafkaProducer;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping(path = {"/kafka"})
@RestController
@Component
/* loaded from: input_file:com/dxy/library/springboot/kafka/rest/KafkaTestRest.class */
public class KafkaTestRest {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestRest.class);

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping(path = {"/test"})
    public String testKafka() {
        log.info("开始发送消息");
        for (int i = 0; i < 4; i++) {
            ExecutorUtil.getInstance().execute(() -> {
                for (int i2 = 0; i2 < 250000; i2++) {
                    this.kafkaProducer.send(KafkaTopic.TOPIC_TEST, "测试消息队列");
                }
            });
        }
        log.info("消息发送完成");
        return "发送成功";
    }

    @KafkaListener(topics = {KafkaTopic.TOPIC_TEST})
    public void consume(List<ConsumerRecord<String, String>> list, Acknowledgment acknowledgment) {
        list.forEach(consumerRecord -> {
            log.info("消息内容：{}", consumerRecord.value());
        });
        acknowledgment.acknowledge();
    }
}
