package com.github.sseserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:com/github/sseserver/LocalConnectionServiceImpl.class */
public class LocalConnectionServiceImpl implements LocalConnectionService, BeanNameAware, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(LocalConnectionServiceImpl.class);
    private static final AtomicInteger SCHEDULED_INDEX = new AtomicInteger();
    protected final Map<String, Set<Long>> accessToken2ConnectionIdMap = new ConcurrentHashMap();
    protected final Map<String, Set<Long>> channel2ConnectionIdMap = new ConcurrentHashMap();
    protected final Map<String, Set<Long>> customerId2ConnectionIdMap = new ConcurrentHashMap();
    protected final Map<String, Set<String>> userId2AccessTokenMap = new ConcurrentHashMap();
    protected final Map<Long, SseEmitter> connectionMap = new ConcurrentHashMap();
    protected final List<Consumer<SseEmitter>> connectListeners = new ArrayList();
    protected final List<Consumer<SseEmitter>> disconnectListeners = new ArrayList();
    protected final Map<String, List<Predicate<SseEmitter>>> connectListenerMap = new ConcurrentHashMap();
    protected final Map<String, List<Predicate<SseEmitter>>> disconnectListenerMap = new ConcurrentHashMap();
    private String beanName = getClass().getSimpleName();
    private final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1, runnable -> {
        return new Thread(runnable, getBeanName() + "-" + SCHEDULED_INDEX.incrementAndGet());
    });
    private int reconnectTime = 5000;
    private boolean destroyFlag;

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> SseEmitter<ACCESS_USER> connect(ACCESS_USER access_user, Long l) {
        return connect(access_user, l, null);
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> SseEmitter<ACCESS_USER> connect(ACCESS_USER access_user, Long l, Map<String, Object> map) {
        if (this.destroyFlag) {
            throw new IllegalStateException("destroy");
        }
        if (l == null) {
            l = 900000L;
        }
        SseEmitter<ACCESS_USER> sseEmitter = new SseEmitter<>(l, access_user);
        sseEmitter.onCompletion(completionCallBack(sseEmitter));
        sseEmitter.onError(errorCallBack(sseEmitter));
        sseEmitter.onTimeout(timeoutCallBack(sseEmitter));
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = this.scheduled;
        sseEmitter.getClass();
        sseEmitter.setTimeoutCheckFuture(scheduledThreadPoolExecutor.schedule(sseEmitter::disconnectByTimeoutCheck, l.longValue() + 1000, TimeUnit.MILLISECONDS));
        Long valueOf = Long.valueOf(sseEmitter.getId());
        String wrapStringKey = wrapStringKey(sseEmitter.getAccessToken());
        String wrapStringKey2 = wrapStringKey(sseEmitter.getUserId());
        String wrapStringKey3 = wrapStringKey(sseEmitter.getCustomerId());
        sseEmitter.addDisConnectListener(sseEmitter2 -> {
            log.debug("sse {} connection disconnect : {}", this.beanName, sseEmitter2);
            String wrapStringKey4 = wrapStringKey(sseEmitter2.getChannel());
            notifyListener(sseEmitter2, this.disconnectListeners, this.disconnectListenerMap);
            synchronized (this) {
                this.connectionMap.remove(valueOf);
                Set<Long> set = this.accessToken2ConnectionIdMap.get(wrapStringKey);
                if (set != null) {
                    set.remove(valueOf);
                    if (set.isEmpty()) {
                        this.accessToken2ConnectionIdMap.remove(wrapStringKey);
                    }
                }
                Set<String> set2 = this.userId2AccessTokenMap.get(wrapStringKey2);
                if (set2 != null) {
                    set2.remove(wrapStringKey);
                    if (set2.isEmpty()) {
                        this.userId2AccessTokenMap.remove(wrapStringKey2);
                    }
                }
                Set<Long> set3 = this.customerId2ConnectionIdMap.get(wrapStringKey3);
                if (set3 != null) {
                    set3.remove(valueOf);
                    if (set3.isEmpty()) {
                        this.customerId2ConnectionIdMap.remove(wrapStringKey3);
                    }
                }
                Set<Long> set4 = this.channel2ConnectionIdMap.get(wrapStringKey4);
                if (set4 != null) {
                    set4.remove(valueOf);
                    if (set4.isEmpty()) {
                        this.channel2ConnectionIdMap.remove(wrapStringKey4);
                    }
                }
            }
        });
        sseEmitter.addConnectListener(sseEmitter3 -> {
            this.channel2ConnectionIdMap.computeIfAbsent(wrapStringKey(sseEmitter3.getChannel()), str -> {
                return Collections.newSetFromMap(new ConcurrentHashMap(3));
            }).add(Long.valueOf(sseEmitter3.getId()));
            log.debug("sse {} connection create : {}", this.beanName, sseEmitter3);
            notifyListener(sseEmitter3, this.connectListeners, this.connectListenerMap);
        });
        this.connectionMap.put(valueOf, sseEmitter);
        this.accessToken2ConnectionIdMap.computeIfAbsent(wrapStringKey, str -> {
            return Collections.newSetFromMap(new ConcurrentHashMap(3));
        }).add(valueOf);
        this.customerId2ConnectionIdMap.computeIfAbsent(wrapStringKey3, str2 -> {
            return Collections.newSetFromMap(new ConcurrentHashMap(3));
        }).add(valueOf);
        this.userId2AccessTokenMap.computeIfAbsent(wrapStringKey2, str3 -> {
            return Collections.newSetFromMap(new ConcurrentHashMap(3));
        }).add(wrapStringKey);
        if (map != null) {
            sseEmitter.getAttributeMap().putAll(map);
        }
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(this.reconnectTime).name("connect-finish").data("{\"connectionId\":" + valueOf + ",\"serverTime\":" + System.currentTimeMillis() + ",\"reconnectTime\":" + this.reconnectTime + ",\"name\":\"" + this.beanName + "\"}"));
            return sseEmitter;
        } catch (IOException e) {
            log.error("sse {} send {} IOException:{}", new Object[]{this.beanName, sseEmitter, e.toString(), e});
            return null;
        }
    }

    @Override // com.github.sseserver.LocalConnectionService
    public SseEmitter disconnectByConnectionId(Long l) {
        SseEmitter connectionById = getConnectionById(l);
        if (connectionById == null || !connectionById.disconnect()) {
            return null;
        }
        return connectionById;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> disconnectByAccessToken(String str) {
        List<SseEmitter> connectionByAccessToken = getConnectionByAccessToken(str);
        ArrayList arrayList = new ArrayList();
        if (connectionByAccessToken != null) {
            for (SseEmitter sseEmitter : connectionByAccessToken) {
                if (sseEmitter.disconnect()) {
                    arrayList.add(sseEmitter);
                }
            }
        }
        return arrayList;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> disconnectByUserId(Object obj) {
        List<SseEmitter> connectionByUserId = getConnectionByUserId(obj);
        ArrayList arrayList = new ArrayList();
        if (connectionByUserId != null) {
            for (SseEmitter sseEmitter : connectionByUserId) {
                if (sseEmitter.disconnect()) {
                    arrayList.add(sseEmitter);
                }
            }
        }
        return arrayList;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> getConnectionAll() {
        return new ArrayList(this.connectionMap.values());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public SseEmitter getConnectionById(Long l) {
        if (l == null) {
            return null;
        }
        return this.connectionMap.get(l);
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> getConnectionByChannel(String str) {
        Set<Long> set = this.channel2ConnectionIdMap.get(wrapStringKey(str));
        return (set == null || set.isEmpty()) ? Collections.emptyList() : (List) set.stream().map(this::getConnectionById).filter((v0) -> {
            return Objects.nonNull(v0);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> getConnectionByAccessToken(String str) {
        Set<Long> set = this.accessToken2ConnectionIdMap.get(wrapStringKey(str));
        return (set == null || set.isEmpty()) ? Collections.emptyList() : (List) set.stream().map(this::getConnectionById).filter((v0) -> {
            return Objects.nonNull(v0);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> getConnectionByCustomerId(Object obj) {
        Set<Long> set = this.customerId2ConnectionIdMap.get(wrapStringKey(obj));
        return (set == null || set.isEmpty()) ? Collections.emptyList() : (List) set.stream().map(this::getConnectionById).filter((v0) -> {
            return Objects.nonNull(v0);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<SseEmitter> getConnectionByUserId(Object obj) {
        Set<String> set = this.userId2AccessTokenMap.get(wrapStringKey(obj));
        return (set == null || set.isEmpty()) ? Collections.emptyList() : (List) set.stream().map(this::getConnectionByAccessToken).flatMap((v0) -> {
            return v0.stream();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> void addConnectListener(String str, String str2, Consumer<SseEmitter<ACCESS_USER>> consumer) {
        List<SseEmitter> connectionByAccessToken = getConnectionByAccessToken(str);
        if (connectionByAccessToken != null) {
            for (SseEmitter<ACCESS_USER> sseEmitter : connectionByAccessToken) {
                if (sseEmitter.isConnect() && Objects.equals(str2, sseEmitter.getChannel())) {
                    consumer.accept(sseEmitter);
                }
            }
        }
        this.connectListenerMap.computeIfAbsent(str, str3 -> {
            return new ArrayList();
        }).add(sseEmitter2 -> {
            if (!Objects.equals(str2, sseEmitter2.getChannel())) {
                return false;
            }
            consumer.accept(sseEmitter2);
            return true;
        });
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> void addConnectListener(String str, Consumer<SseEmitter<ACCESS_USER>> consumer) {
        List<SseEmitter> connectionByAccessToken = getConnectionByAccessToken(str);
        if (connectionByAccessToken != null) {
            for (SseEmitter<ACCESS_USER> sseEmitter : connectionByAccessToken) {
                if (sseEmitter.isConnect()) {
                    consumer.accept(sseEmitter);
                }
            }
        }
        this.connectListenerMap.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        }).add(sseEmitter2 -> {
            consumer.accept(sseEmitter2);
            return true;
        });
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> void addConnectListener(Consumer<SseEmitter<ACCESS_USER>> consumer) {
        this.connectListeners.add(consumer);
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> void addDisConnectListener(Consumer<SseEmitter<ACCESS_USER>> consumer) {
        this.disconnectListeners.add(consumer);
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> void addDisConnectListener(String str, Consumer<SseEmitter<ACCESS_USER>> consumer) {
        this.disconnectListenerMap.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        }).add(sseEmitter -> {
            consumer.accept(sseEmitter);
            return true;
        });
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int send(Collection<SseEmitter> collection, SseEmitter.SseEventBuilder sseEventBuilder) {
        int i = 0;
        Iterator<SseEmitter> it = collection.iterator();
        while (it.hasNext()) {
            if (send(it.next(), sseEventBuilder)) {
                i++;
            }
        }
        return i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int sendAll(SseEmitter.SseEventBuilder sseEventBuilder) {
        return send(getConnectionAll(), sseEventBuilder);
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int sendByConnectionId(Collection<Long> collection, SseEmitter.SseEventBuilder sseEventBuilder) {
        int i = 0;
        Iterator<Long> it = collection.iterator();
        while (it.hasNext()) {
            if (send(getConnectionById(it.next()), sseEventBuilder)) {
                i++;
            }
        }
        return i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int sendByChannel(Collection<String> collection, SseEmitter.SseEventBuilder sseEventBuilder) {
        int i = 0;
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            i += send(getConnectionByChannel(it.next()), sseEventBuilder);
        }
        return i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int sendByAccessToken(Collection<String> collection, SseEmitter.SseEventBuilder sseEventBuilder) {
        int i = 0;
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            i += send(getConnectionByAccessToken(it.next()), sseEventBuilder);
        }
        return i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int sendByUserId(Collection<?> collection, SseEmitter.SseEventBuilder sseEventBuilder) {
        int i = 0;
        Iterator<?> it = collection.iterator();
        while (it.hasNext()) {
            i += send(getConnectionByUserId(it.next()), sseEventBuilder);
        }
        return i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int sendByCustomerId(Collection<?> collection, SseEmitter.SseEventBuilder sseEventBuilder) {
        int i = 0;
        Iterator<?> it = collection.iterator();
        while (it.hasNext()) {
            i += send(getConnectionByCustomerId(it.next()), sseEventBuilder);
        }
        return i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> List<ACCESS_USER> getUsers() {
        return (List) this.connectionMap.values().stream().map(sseEmitter -> {
            return sseEmitter.getAccessUser();
        }).filter(obj -> {
            return Objects.nonNull(obj);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public <ACCESS_USER extends AccessUser & AccessToken> ACCESS_USER getUser(Object obj) {
        List<SseEmitter> connectionByUserId = getConnectionByUserId(obj);
        if (connectionByUserId.isEmpty()) {
            return null;
        }
        return (ACCESS_USER) connectionByUserId.get(0).getAccessUser();
    }

    @Override // com.github.sseserver.LocalConnectionService
    public boolean isOnline(Object obj) {
        Set<String> set = this.userId2AccessTokenMap.get(wrapStringKey(obj));
        return (set == null || set.isEmpty()) ? false : true;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<Long> getConnectionIds() {
        return new ArrayList(this.connectionMap.keySet());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<String> getAccessTokens() {
        return new ArrayList(this.accessToken2ConnectionIdMap.keySet());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<String> getUserIds() {
        return (List) this.connectionMap.values().stream().map((v0) -> {
            return v0.getUserId();
        }).filter(Objects::nonNull).map((v0) -> {
            return v0.toString();
        }).map((v1) -> {
            return wrapStringKey(v1);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<String> getCustomerIds() {
        return (List) this.connectionMap.values().stream().map((v0) -> {
            return v0.getCustomerId();
        }).filter(Objects::nonNull).map((v0) -> {
            return v0.toString();
        }).map((v1) -> {
            return wrapStringKey(v1);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public List<String> getChannels() {
        return (List) this.connectionMap.values().stream().map((v0) -> {
            return v0.getChannel();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.toString();
        }).map((v1) -> {
            return wrapStringKey(v1);
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int getAccessTokenCount() {
        return this.accessToken2ConnectionIdMap.size();
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int getUserCount() {
        return (int) this.connectionMap.values().stream().map((v0) -> {
            return v0.getChannel();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.toString();
        }).map((v1) -> {
            return wrapStringKey(v1);
        }).distinct().count();
    }

    @Override // com.github.sseserver.LocalConnectionService
    public int getConnectionCount() {
        return this.connectionMap.size();
    }

    protected Runnable completionCallBack(SseEmitter sseEmitter) {
        return () -> {
            sseEmitter.disconnect();
            log.debug("sse {} completion 结束连接：{}", this.beanName, sseEmitter);
        };
    }

    protected Runnable timeoutCallBack(SseEmitter sseEmitter) {
        return () -> {
            sseEmitter.disconnect();
            log.debug("sse {} timeout 超过最大连接时间：{}", this.beanName, sseEmitter);
        };
    }

    protected Consumer<Throwable> errorCallBack(SseEmitter sseEmitter) {
        return th -> {
            sseEmitter.disconnect();
            log.debug("sse {} {} error 发生错误：{}, {}", new Object[]{this.beanName, sseEmitter, th, th});
        };
    }

    protected String wrapStringKey(Object obj) {
        return obj == null ? "" : obj.toString();
    }

    protected <ACCESS_USER extends AccessUser & AccessToken> void notifyListener(SseEmitter<ACCESS_USER> sseEmitter, List<Consumer<SseEmitter>> list, Map<String, List<Predicate<SseEmitter>>> map) {
        for (Consumer<SseEmitter> consumer : list) {
            try {
                consumer.accept(sseEmitter);
            } catch (Exception e) {
                log.error("notifyListener error = {}. listener = {}, emitter = {}", new Object[]{e.toString(), consumer, sseEmitter, e});
            }
        }
        List<Predicate<SseEmitter>> list2 = map.get(wrapStringKey(sseEmitter.getAccessToken()));
        if (list2 != null) {
            Iterator it = new ArrayList(list2).iterator();
            while (it.hasNext()) {
                Predicate predicate = (Predicate) it.next();
                try {
                    if (predicate.test(sseEmitter)) {
                        list2.remove(predicate);
                    }
                } catch (Exception e2) {
                    log.error("notifyListener error = {}. predicate = {}, emitter = {}", new Object[]{e2.toString(), predicate, sseEmitter, e2});
                }
            }
        }
    }

    public <ACCESS_USER extends AccessUser & AccessToken> boolean send(SseEmitter<ACCESS_USER> sseEmitter, SseEmitter.SseEventBuilder sseEventBuilder) {
        if (sseEmitter == null || sseEmitter.isDisconnect()) {
            return false;
        }
        try {
            sseEmitter.send(sseEventBuilder);
            return true;
        } catch (IOException e) {
            sseEmitter.disconnect();
            return false;
        }
    }

    public int getReconnectTime() {
        return this.reconnectTime;
    }

    public void setReconnectTime(int i) {
        this.reconnectTime = i;
    }

    @Override // com.github.sseserver.LocalConnectionService
    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void destroy() {
        this.destroyFlag = true;
        this.connectionMap.values().forEach((v0) -> {
            v0.disconnect();
        });
        this.scheduled.shutdown();
    }
}
