package io.github.aooohan.mq.adapter;

import io.github.aooohan.mq.core.RedisMqErrorHandler;
import io.github.aooohan.mq.core.RedisMqOperation;
import io.github.aooohan.mq.core.RedisMqPublisher;
import io.github.aooohan.mq.core.listener.RedisMqListener;
import io.github.aooohan.mq.entity.MqContent;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.util.Assert;

/* loaded from: input_file:io/github/aooohan/mq/adapter/StreamListenerAdapter.class */
public class StreamListenerAdapter implements StreamListener<String, ObjectRecord<String, String>> {
    private final RedisMqOperation messageOperation;
    private final RedisMqListener<?> redisMqListener;
    private final RedisMqErrorHandler errorHandler;

    public StreamListenerAdapter(RedisMqOperation redisMqOperation, RedisMqListener<?> redisMqListener) {
        Assert.notNull(redisMqOperation, "messageOperation must not be null");
        this.messageOperation = redisMqOperation;
        this.redisMqListener = redisMqListener;
        this.errorHandler = redisMqOperation;
    }

    public void onMessage(ObjectRecord<String, String> objectRecord) {
        try {
            this.redisMqListener.onMessage(wrapperRecord(objectRecord));
        } catch (Throwable th) {
            this.errorHandler.onError((String) objectRecord.getValue(), th);
        }
    }

    private <T> MqContent<T> wrapperRecord(final ObjectRecord<String, String> objectRecord) throws Exception {
        final Object deserialize = this.messageOperation.deserialize((String) objectRecord.getValue());
        return new MqContent<T>() { // from class: io.github.aooohan.mq.adapter.StreamListenerAdapter.1
            @Override // io.github.aooohan.mq.entity.MqContent
            public String id() {
                return objectRecord.getId().getValue();
            }

            @Override // io.github.aooohan.mq.entity.MqContent
            public String groupName() {
                return StreamListenerAdapter.this.redisMqListener.groupName();
            }

            @Override // io.github.aooohan.mq.entity.MqContent
            public String topic() {
                return StreamListenerAdapter.this.redisMqListener.topicName();
            }

            @Override // io.github.aooohan.mq.entity.MqContent
            public boolean ack() {
                if (StreamListenerAdapter.this.redisMqListener.autoAck()) {
                    return true;
                }
                return StreamListenerAdapter.this.messageOperation.ack(topic(), groupName(), id());
            }

            @Override // io.github.aooohan.mq.entity.MqContent
            public T getBody() {
                return (T) deserialize;
            }

            @Override // io.github.aooohan.mq.entity.MqContent
            public RedisMqPublisher getPublisher() {
                return StreamListenerAdapter.this.messageOperation;
            }
        };
    }
}
