package org.enodeframework.kafka;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.enodeframework.common.io.Task;
import org.enodeframework.queue.MessageHandlerHolder;
import org.enodeframework.queue.QueueMessage;
import org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.BatchConsumerAwareMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;

/* loaded from: input_file:org/enodeframework/kafka/KafkaBatchMessageListener.class */
public class KafkaBatchMessageListener implements BatchAcknowledgingMessageListener<String, String>, BatchConsumerAwareMessageListener<String, String>, BatchAcknowledgingConsumerAwareMessageListener<String, String> {
    private final MessageHandlerHolder messageHandlerHolder;

    public KafkaBatchMessageListener(MessageHandlerHolder messageHandlerHolder) {
        this.messageHandlerHolder = messageHandlerHolder;
    }

    private QueueMessage covertToQueueMessage(ConsumerRecord<String, String> consumerRecord) {
        String str = (String) Optional.ofNullable(consumerRecord.headers().lastHeader("ETYPE")).map(header -> {
            return new String(header.value());
        }).orElse("");
        String str2 = (String) Optional.ofNullable(consumerRecord.headers().lastHeader("ETAG")).map(header2 -> {
            return new String(header2.value());
        }).orElse("");
        QueueMessage queueMessage = new QueueMessage();
        queueMessage.setBody(((String) consumerRecord.value()).getBytes(StandardCharsets.UTF_8));
        queueMessage.setType(str);
        queueMessage.setTag(str2);
        queueMessage.setTopic(consumerRecord.topic());
        queueMessage.setRouteKey((String) consumerRecord.key());
        queueMessage.setKey((String) consumerRecord.key());
        return queueMessage;
    }

    public void onMessage(@NonNull List<ConsumerRecord<String, String>> list) {
        onMessage(list, (Acknowledgment) null, (Consumer<?, ?>) null);
    }

    public void onMessage(@NonNull List<ConsumerRecord<String, String>> list, @Nullable Acknowledgment acknowledgment) {
        onMessage(list, acknowledgment, (Consumer<?, ?>) null);
    }

    public void onMessage(@NonNull List<ConsumerRecord<String, String>> list, @Nullable Consumer<?, ?> consumer) {
        onMessage(list, (Acknowledgment) null, consumer);
    }

    public void onMessage(@NonNull List<ConsumerRecord<String, String>> list, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(consumerRecord -> {
            QueueMessage covertToQueueMessage = covertToQueueMessage(consumerRecord);
            this.messageHandlerHolder.chooseMessageHandler(covertToQueueMessage.getType()).handle(covertToQueueMessage, queueMessage -> {
                countDownLatch.countDown();
            });
        });
        Task.await(countDownLatch);
        if (acknowledgment != null) {
            acknowledgment.acknowledge();
        }
    }

    public /* bridge */ /* synthetic */ void onMessage(@NonNull Object obj, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer) {
        onMessage((List<ConsumerRecord<String, String>>) obj, acknowledgment, (Consumer<?, ?>) consumer);
    }

    public /* bridge */ /* synthetic */ void onMessage(@NonNull Object obj, @Nullable Consumer consumer) {
        onMessage((List<ConsumerRecord<String, String>>) obj, (Consumer<?, ?>) consumer);
    }
}
