package io.codeed.redisqueue;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

/* loaded from: input_file:io/codeed/redisqueue/RedisQueue.class */
public class RedisQueue {
    private static final Logger log = LogManager.getLogger(RedisQueue.class);
    private static final String KEY_STREAM_DATA = "_streamData";
    private static final String KEY_REQUEST_ID = "_requestId";
    private static final String KEY_ERROR = "_error";
    private final RedisTemplate<String, String> redisTemplate;
    private final String requestConsumerGroupName;
    private final String requestQueueName;
    private final String responseQueueName;
    private final String responseConsumerGroupName;
    private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
    private final ConcurrentSkipListSet<String> ongoingRequest = new ConcurrentSkipListSet<>();
    private final Map<String, CompletableFuture<String>> responseMap = new ConcurrentHashMap();
    private boolean inited = false;

    public RedisQueue(RedisTemplate<String, String> redisTemplate, String str, StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer) {
        this.redisTemplate = redisTemplate;
        this.requestQueueName = "request-stream-" + str;
        this.responseQueueName = "response-stream-" + str;
        this.container = streamMessageListenerContainer;
        this.requestConsumerGroupName = "request-stream-cg-" + str;
        this.responseConsumerGroupName = "response-stream-cg-" + str;
    }

    public void init(boolean z, boolean z2, RequestProcessor requestProcessor) {
        if (this.inited) {
            return;
        }
        if (z) {
            initResponseListener();
        }
        if (z2) {
            initRequestProcessListener(requestProcessor);
        }
        this.inited = true;
    }

    private Map<String, String> mapOfData(String str, String str2, Exception exc) {
        HashMap hashMap = new HashMap();
        hashMap.put(KEY_STREAM_DATA, str2);
        hashMap.put(KEY_REQUEST_ID, str);
        hashMap.put(KEY_ERROR, exc == null ? "" : (String) StringUtils.firstNonEmpty(new String[]{ExceptionUtils.getRootCauseMessage(exc), "内部错误"}));
        return hashMap;
    }

    private void initRequestProcessListener(RequestProcessor requestProcessor) {
        this.container.receive(Consumer.from(this.requestConsumerGroupName, RandomStringUtils.randomAlphabetic(5)), StreamOffset.create(this.requestQueueName, ReadOffset.lastConsumed()), mapRecord -> {
            String str = (String) ((Map) mapRecord.getValue()).get(KEY_REQUEST_ID);
            String str2 = (String) ((Map) mapRecord.getValue()).get(KEY_STREAM_DATA);
            String str3 = null;
            try {
                str3 = requestProcessor.process(str, str2);
                this.redisTemplate.opsForStream().add(this.responseQueueName, mapOfData(str, str3, null));
            } catch (Exception e) {
                requestProcessor.handleError(str, str2, e);
                this.redisTemplate.opsForStream().add(this.responseQueueName, mapOfData(str, str3, e));
            }
            this.redisTemplate.opsForStream().acknowledge(this.responseQueueName, this.requestConsumerGroupName, new RecordId[]{mapRecord.getId()});
        });
    }

    public void enqueue(String str) throws Exception {
        enqueueAndWaitResponse(str, -1);
    }

    public String enqueueAndWaitResponse(String str, int i) throws Exception {
        if (!this.inited) {
            throw new IllegalStateException("请先调用init方法");
        }
        String uuid = UUID.randomUUID().toString();
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        this.responseMap.put(uuid, completableFuture);
        this.ongoingRequest.add(uuid);
        RecordId add = this.redisTemplate.opsForStream().add(this.requestQueueName, mapOfData(uuid, str, null));
        log.info("加入队列：{}， {}", add.getValue(), StringUtils.abbreviate(str, 256));
        try {
            if (i <= 0) {
                return null;
            }
            try {
                String str2 = completableFuture.get(i, TimeUnit.SECONDS);
                this.responseMap.remove(uuid);
                this.ongoingRequest.remove(uuid);
                return str2;
            } catch (ExecutionException e) {
                log.error("等待响应执行错误：{}, {}", uuid, add.getValue(), e);
                throw e;
            } catch (TimeoutException e2) {
                log.error("等待响应错误：{}, {}", uuid, add.getValue(), e2);
                throw e2;
            }
        } catch (Throwable th) {
            this.responseMap.remove(uuid);
            this.ongoingRequest.remove(uuid);
            throw th;
        }
    }

    private void initResponseListener() {
        this.container.receive(Consumer.from(this.responseConsumerGroupName, RandomStringUtils.randomAlphabetic(5)), StreamOffset.create(this.responseQueueName, ReadOffset.lastConsumed()), mapRecord -> {
            String str = (String) ((Map) mapRecord.getValue()).get(KEY_REQUEST_ID);
            String str2 = (String) ((Map) mapRecord.getValue()).get(KEY_STREAM_DATA);
            CompletableFuture<String> completableFuture = this.responseMap.get(str);
            if (completableFuture != null) {
                completableFuture.complete(str2);
            }
            this.redisTemplate.opsForStream().acknowledge(this.responseQueueName, this.responseConsumerGroupName, new RecordId[]{mapRecord.getId()});
        });
    }
}
