package io.github.hylexus.xtream.codec.server.reactive.spec.impl;

import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamExchange;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamInbound;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSession;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionEventListener;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionIdGenerator;
import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager;
import io.github.hylexus.xtream.codec.server.reactive.spec.common.XtreamIntervalChecker;
import io.github.hylexus.xtream.codec.server.reactive.spec.domain.values.UdpSessionIdleStateCheckerProps;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/* loaded from: input_file:io/github/hylexus/xtream/codec/server/reactive/spec/impl/AbstractXtreamSessionManager.class */
public abstract class AbstractXtreamSessionManager<S extends XtreamSession> implements XtreamSessionManager<S> {
    private static final Logger log = LoggerFactory.getLogger(AbstractXtreamSessionManager.class);
    protected final XtreamSessionIdGenerator sessionIdGenerator;
    protected final XtreamIntervalChecker checker;
    private final Duration maxIdleTime;
    protected final ConcurrentMap<String, S> sessions = new ConcurrentHashMap();
    private final AtomicLong tcpSessionCount = new AtomicLong(0);
    private final AtomicLong udpSessionCount = new AtomicLong(0);
    private final Clock clock = Clock.system(ZoneId.of("Asia/Shanghai"));
    private final Lock lock = new ReentrantLock();
    protected final List<XtreamSessionEventListener> listenerList = new ArrayList();

    public AbstractXtreamSessionManager(boolean z, UdpSessionIdleStateCheckerProps udpSessionIdleStateCheckerProps, XtreamSessionIdGenerator xtreamSessionIdGenerator) {
        this.sessionIdGenerator = xtreamSessionIdGenerator;
        if (z) {
            this.checker = new XtreamIntervalChecker("session-checker", udpSessionIdleStateCheckerProps.getCheckInterval(), udpSessionIdleStateCheckerProps.getCheckBackoffTime(), (str, l) -> {
                log.debug("{} interval: {}", str, l);
                doCheckUdpSessionStatus();
                return Mono.empty();
            });
            this.checker.start();
        } else {
            this.checker = null;
        }
        this.maxIdleTime = udpSessionIdleStateCheckerProps.getMaxIdleTime();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public XtreamSessionIdGenerator sessionIdGenerator() {
        return this.sessionIdGenerator;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public void addListener(XtreamSessionEventListener xtreamSessionEventListener) {
        this.listenerList.add(xtreamSessionEventListener);
    }

    void doCheckUdpSessionStatus() {
        if (!this.sessions.isEmpty() && this.lock.tryLock()) {
            try {
                Instant instant = this.clock.instant();
                for (S s : this.sessions.values()) {
                    if (s.type() == XtreamInbound.Type.UDP && isExpired(s, instant)) {
                        log.info("Session expired, remove it: {}", s);
                        s.invalidate(XtreamSessionEventListener.DefaultSessionCloseReason.EXPIRED);
                    }
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    protected boolean isExpired(S s, Instant instant) {
        return instant.minus((TemporalAmount) this.maxIdleTime).isAfter(s.lastCommunicateTime());
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public Mono<S> getSession(XtreamExchange xtreamExchange) {
        return Mono.defer(() -> {
            return Mono.justOrEmpty(this.sessions.get(this.sessionIdGenerator.generateSessionId(xtreamExchange)));
        });
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public Mono<S> createSession(String str, XtreamExchange xtreamExchange) {
        S doCreateSession = doCreateSession(str, xtreamExchange);
        this.sessions.put(str, doCreateSession);
        if (doCreateSession.type() == XtreamInbound.Type.TCP) {
            this.tcpSessionCount.incrementAndGet();
        } else {
            this.udpSessionCount.incrementAndGet();
        }
        invokeListener(xtreamSessionEventListener -> {
            xtreamSessionEventListener.afterSessionCreate(doCreateSession);
        });
        return Mono.just(doCreateSession);
    }

    protected void invokeListener(Consumer<XtreamSessionEventListener> consumer) {
        Iterator<XtreamSessionEventListener> it = this.listenerList.iterator();
        while (it.hasNext()) {
            try {
                consumer.accept(it.next());
            } catch (Exception e) {
                log.error("Error occurred while invoke listener", e);
            }
        }
    }

    protected abstract S doCreateSession(String str, XtreamExchange xtreamExchange);

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public Mono<S> getSessionById(String str) {
        return Mono.justOrEmpty(this.sessions.get(str));
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public void closeSession(S s, XtreamSessionEventListener.SessionCloseReason sessionCloseReason) {
        this.sessions.remove(s.id());
        doClose(s, sessionCloseReason);
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public boolean closeSessionById(String str, XtreamSessionEventListener.SessionCloseReason sessionCloseReason) {
        S remove = this.sessions.remove(str);
        if (remove == null) {
            return false;
        }
        doClose(remove, sessionCloseReason);
        return true;
    }

    protected void doClose(S s, XtreamSessionEventListener.SessionCloseReason sessionCloseReason) {
        if (s.type() == XtreamInbound.Type.TCP) {
            this.tcpSessionCount.decrementAndGet();
        } else {
            this.udpSessionCount.decrementAndGet();
        }
        invokeListener(xtreamSessionEventListener -> {
            xtreamSessionEventListener.beforeSessionClose(s, sessionCloseReason);
        });
        s.outbound().withConnection(connection -> {
            try {
                beforeConnectionClose(connection, s);
            } finally {
                if (s.type() == XtreamInbound.Type.TCP) {
                    connection.dispose();
                }
            }
        });
    }

    protected void beforeConnectionClose(Connection connection, S s) {
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public Stream<S> list() {
        return this.sessions.values().stream();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public long count() {
        return this.sessions.size();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public long countTcp() {
        return this.tcpSessionCount.get();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public long countUdp() {
        return this.udpSessionCount.get();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSessionManager
    public void shutdown() {
        if (this.checker != null) {
            this.checker.stop();
        }
    }
}
