package com.github.rexsheng.springboot.faster.sse;

import com.github.rexsheng.springboot.faster.sse.configuration.SseProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:com/github/rexsheng/springboot/faster/sse/SseServer.class */
public class SseServer {
    private static Logger logger = LoggerFactory.getLogger(SseServer.class);
    private AtomicInteger currentConnectTotal = new AtomicInteger(0);
    private Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap();
    private SseProperties sseProperties;

    public SseServer(SseProperties sseProperties) {
        this.sseProperties = sseProperties;
    }

    public SseEmitter createConnect(String str) throws IOException {
        SseEmitter sseEmitter = this.sseProperties.getTimeout() != null ? new SseEmitter(Long.valueOf(this.sseProperties.getTimeout().toMillis())) : new SseEmitter();
        if (this.sseProperties.getReconnectInterval() != null) {
            sseEmitter.send(SseEmitter.event().reconnectTime(this.sseProperties.getReconnectInterval().toMillis()));
        }
        sseEmitter.onCompletion(completionCallBack(str));
        sseEmitter.onTimeout(timeOutCallBack(str));
        sseEmitter.onError(errorCallBack(str));
        this.sseEmitterMap.put(str, sseEmitter);
        logger.debug("创建sse连接成功, 当前连接总数: {}, messageId: {}", Integer.valueOf(this.currentConnectTotal.incrementAndGet()), str);
        return sseEmitter;
    }

    public void sendMessage(String str, String str2) {
        this.sseEmitterMap.forEach((str3, sseEmitter) -> {
            try {
                if (str3.startsWith(str + "_")) {
                    sseEmitter.send(str2);
                }
            } catch (IOException e) {
                logger.error("发送消息异常, messageId: {}, 异常信息: {}", str, e.getMessage());
                e.printStackTrace();
                removeMessageId(str3);
            }
        });
    }

    public void batchAllSendMessage(String str) {
        this.sseEmitterMap.forEach((str2, sseEmitter) -> {
            try {
                sseEmitter.send(str, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                logger.error("广播发送消息异常, messageId: {}, 异常信息: {}", str2, e.getMessage());
                removeMessageId(str2);
            }
        });
    }

    public void batchSendMessage(List<String> list, String str) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        list.forEach(str2 -> {
            sendMessage(str2, str);
        });
    }

    public void groupSendMessage(String str, String str2) {
        if (ObjectUtils.isEmpty(this.sseEmitterMap)) {
            return;
        }
        this.sseEmitterMap.forEach((str3, sseEmitter) -> {
            try {
                if (str3.startsWith(str)) {
                    sseEmitter.send(str2);
                }
            } catch (IOException e) {
                logger.error("组播发送消息异常, groupId: {}, 异常信息: {}", str, e.getMessage());
                removeMessageId(str3);
            }
        });
    }

    public void removeMessageId(String str) {
        SseEmitter remove = this.sseEmitterMap.remove(str);
        if (remove != null) {
            remove.complete();
            this.currentConnectTotal.getAndDecrement();
        }
        logger.debug("remove messageId: {}", str);
    }

    public List<String> getMessageIds() {
        return new ArrayList(this.sseEmitterMap.keySet());
    }

    public int getConnectTotal() {
        return this.currentConnectTotal.intValue();
    }

    private Runnable completionCallBack(String str) {
        return () -> {
            logger.debug("结束连接, messageId: {}", str);
            removeMessageId(str);
        };
    }

    private Runnable timeOutCallBack(String str) {
        return () -> {
            logger.debug("连接超时, messageId: {}", str);
            removeMessageId(str);
        };
    }

    private Consumer<Throwable> errorCallBack(String str) {
        return th -> {
            logger.error("连接异常, messageId: {}", str);
            removeMessageId(str);
        };
    }
}
