package io.github.aooohan.mq.adapter;

import io.github.aooohan.mq.core.MessageErrorHandler;
import io.github.aooohan.mq.core.MessageOperation;
import io.github.aooohan.mq.core.MessagePublisher;
import io.github.aooohan.mq.core.listener.RedisMessageListener;
import io.github.aooohan.mq.entity.Message;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/aooohan/mq/adapter/StreamListenerAdapter.class */
public class StreamListenerAdapter implements StreamListener<String, ObjectRecord<String, String>> {
    private final MessageOperation messageOperation;
    private final RedisMessageListener<?> redisMessageListener;
    private final MessageErrorHandler errorHandler;

    public StreamListenerAdapter(MessageOperation messageOperation, RedisMessageListener<?> redisMessageListener) {
        Assert.notNull(messageOperation, "messageOperation must not be null");
        this.messageOperation = messageOperation;
        this.redisMessageListener = redisMessageListener;
        this.errorHandler = messageOperation;
    }

    public void onMessage(ObjectRecord<String, String> objectRecord) {
        try {
            this.redisMessageListener.onMessage(wrapperRecord(objectRecord));
        } catch (Throwable th) {
            this.errorHandler.onError((String) objectRecord.getValue(), th);
        }
    }

    private <T> Message<T> wrapperRecord(final ObjectRecord<String, String> objectRecord) throws Exception {
        final Object deserialize = this.messageOperation.deserialize((String) objectRecord.getValue());
        return new Message<T>() { // from class: io.github.aooohan.mq.adapter.StreamListenerAdapter.1
            @Override // io.github.aooohan.mq.entity.Message
            public String id() {
                return objectRecord.getId().getValue();
            }

            @Override // io.github.aooohan.mq.entity.Message
            public String groupName() {
                return StreamListenerAdapter.this.redisMessageListener.groupName();
            }

            @Override // io.github.aooohan.mq.entity.Message
            public String topic() {
                return StreamListenerAdapter.this.redisMessageListener.topicName();
            }

            @Override // io.github.aooohan.mq.entity.Message
            public boolean ack() {
                if (StreamListenerAdapter.this.redisMessageListener.autoAck()) {
                    return true;
                }
                return StreamListenerAdapter.this.messageOperation.ack(topic(), groupName(), id());
            }

            @Override // io.github.aooohan.mq.entity.Message
            public T getBody() {
                return (T) deserialize;
            }

            @Override // io.github.aooohan.mq.entity.Message
            public MessagePublisher getPublisher() {
                return StreamListenerAdapter.this.messageOperation;
            }
        };
    }
}
