package io.github.aooohan.mq.adapter;

import io.github.aooohan.mq.core.DefaultRedisMqPublisher;
import io.github.aooohan.mq.core.MessageErrorHandler;
import io.github.aooohan.mq.core.RedisMqPublisher;
import io.github.aooohan.mq.core.listener.AnnMessageListener;
import io.github.aooohan.mq.core.listener.MessageListener;
import io.github.aooohan.mq.core.wrapper.MessageListenerWrapper;
import io.github.aooohan.mq.entity.AckObjectRecord;
import io.github.aooohan.mq.serializer.MessageSerializer;
import java.util.Objects;
import org.springframework.core.ResolvableType;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;

/* loaded from: input_file:io/github/aooohan/mq/adapter/StreamListenerAdapter.class */
public class StreamListenerAdapter implements StreamListener<String, ObjectRecord<String, String>> {
    private final RedisTemplate<String, String> redisTemplate;
    private final MessageListener<?> messageListener;
    private final Class<?> paramClass;
    private final MessageSerializer messageSerializer;
    private final MessageErrorHandler errorHandler;
    private final RedisMqPublisher redisMqPublisher;

    public StreamListenerAdapter(RedisTemplate<String, String> redisTemplate, MessageListener<?> messageListener) {
        this.redisTemplate = redisTemplate;
        this.messageListener = messageListener;
        this.messageSerializer = messageListener.serializer();
        this.errorHandler = messageListener.errorHandler();
        if (messageListener instanceof MessageListenerWrapper) {
            this.paramClass = getParamClass(((MessageListenerWrapper) messageListener).getTarget());
        } else {
            this.paramClass = getParamClass(messageListener);
        }
        this.redisMqPublisher = new DefaultRedisMqPublisher(this.messageSerializer, redisTemplate);
    }

    private Class<?> getParamClass(MessageListener<?> messageListener) {
        return messageListener instanceof AnnMessageListener ? ((AnnMessageListener) messageListener).getParameterizedType() : ResolvableType.forInstance(messageListener).as(MessageListener.class).resolveGeneric(new int[]{0});
    }

    public void onMessage(ObjectRecord<String, String> objectRecord) {
        try {
            this.messageListener.onMessage(wrapperRecord(objectRecord));
        } catch (Throwable th) {
            this.errorHandler.handle(this.messageListener.topicName(), this.messageListener.groupName(), (String) objectRecord.getValue(), th);
        }
    }

    private <T> AckObjectRecord<T> wrapperRecord(final ObjectRecord<String, String> objectRecord) throws Exception {
        final Object deserialize = this.messageSerializer.deserialize((String) objectRecord.getValue(), this.paramClass);
        return new AckObjectRecord<T>() { // from class: io.github.aooohan.mq.adapter.StreamListenerAdapter.1
            @Override // io.github.aooohan.mq.entity.AckObjectRecord
            public String id() {
                return objectRecord.getId().getValue();
            }

            @Override // io.github.aooohan.mq.entity.AckObjectRecord
            public String groupName() {
                return StreamListenerAdapter.this.messageListener.groupName();
            }

            @Override // io.github.aooohan.mq.entity.AckObjectRecord
            public String topic() {
                return StreamListenerAdapter.this.messageListener.topicName();
            }

            @Override // io.github.aooohan.mq.entity.AckObjectRecord
            public boolean ack() {
                Long l = 1L;
                return l.equals(StreamListenerAdapter.this.redisTemplate.opsForStream().acknowledge(Objects.requireNonNull(topic()), groupName(), new String[]{id()}));
            }

            @Override // io.github.aooohan.mq.entity.AckObjectRecord
            public T getBody() {
                return (T) deserialize;
            }

            @Override // io.github.aooohan.mq.entity.AckObjectRecord
            public RedisMqPublisher getPublisher() {
                return StreamListenerAdapter.this.redisMqPublisher;
            }
        };
    }
}
