package io.github.hylexus.jt.jt808.spec.session;

import io.github.hylexus.jt.annotation.BuiltinComponent;
import io.github.hylexus.jt.jt808.Jt808ProtocolVersion;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@BuiltinComponent
/* loaded from: input_file:io/github/hylexus/jt/jt808/spec/session/DefaultJt808SessionManager.class */
public class DefaultJt808SessionManager implements Jt808SessionManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultJt808SessionManager.class);
    private final Jt808FlowIdGeneratorFactory flowIdGeneratorFactory;
    private static volatile Jt808SessionManager INSTANCE;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String, Jt808Session> sessionMap = new ConcurrentHashMap();
    private final Map<String, String> sessionIdTerminalIdMapping = new ConcurrentHashMap();
    private final List<Jt808SessionEventListener> listeners = new ArrayList();

    private DefaultJt808SessionManager(Jt808FlowIdGeneratorFactory jt808FlowIdGeneratorFactory) {
        this.flowIdGeneratorFactory = jt808FlowIdGeneratorFactory;
    }

    public static Jt808SessionManager getInstance(Jt808FlowIdGeneratorFactory jt808FlowIdGeneratorFactory) {
        if (INSTANCE == null) {
            synchronized (DefaultJt808SessionManager.class) {
                if (INSTANCE == null) {
                    INSTANCE = new DefaultJt808SessionManager(jt808FlowIdGeneratorFactory);
                }
            }
        }
        return INSTANCE;
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public Stream<Jt808Session> list() {
        this.lock.readLock().lock();
        try {
            return this.sessionMap.values().stream();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public long count() {
        return this.sessionMap.size();
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public long count(Predicate<Jt808Session> predicate) {
        return this.sessionMap.values().stream().filter(predicate).count();
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public Jt808Session generateSession(String str, Jt808ProtocolVersion jt808ProtocolVersion, Channel channel) {
        return buildSession(str, jt808ProtocolVersion, channel, this.flowIdGeneratorFactory.create());
    }

    protected DefaultJt808Session buildSession(String str, Jt808ProtocolVersion jt808ProtocolVersion, Channel channel, Jt808FlowIdGenerator jt808FlowIdGenerator) {
        DefaultJt808Session defaultJt808Session = new DefaultJt808Session(jt808FlowIdGenerator);
        defaultJt808Session.channel(channel);
        defaultJt808Session.id(generateSessionId(channel));
        defaultJt808Session.terminalId(str);
        defaultJt808Session.lastCommunicateTimestamp(System.currentTimeMillis());
        defaultJt808Session.protocolVersion(jt808ProtocolVersion);
        return defaultJt808Session;
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public Optional<Jt808Session> findBySessionId(String str) {
        String str2 = this.sessionIdTerminalIdMapping.get(str);
        return StringUtils.isEmpty(str2) ? Optional.empty() : findByTerminalId(str2, false);
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public Jt808Session persistenceIfNecessary(String str, Jt808ProtocolVersion jt808ProtocolVersion, Channel channel, boolean z) {
        Optional<Jt808Session> findByTerminalId = findByTerminalId(str, z);
        if (!findByTerminalId.isPresent()) {
            Jt808Session generateSession = generateSession(str, jt808ProtocolVersion, channel);
            persistence(generateSession);
            return generateSession;
        }
        Jt808Session jt808Session = findByTerminalId.get();
        if (jt808Session.channel() != channel) {
            log.warn("replace channel for terminal({}), new:{}, old:{}", new Object[]{str, channel.remoteAddress(), jt808Session.channel().remoteAddress()});
            jt808Session.channel(channel);
        }
        return jt808Session;
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public void persistence(Jt808Session jt808Session) {
        this.lock.writeLock().lock();
        try {
            this.sessionMap.put(jt808Session.terminalId(), jt808Session);
            this.sessionIdTerminalIdMapping.put(jt808Session.id(), jt808Session.terminalId());
            invokeListeners(jt808SessionEventListener -> {
                jt808SessionEventListener.onSessionAdd(jt808Session);
            });
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void invokeListeners(Consumer<Jt808SessionEventListener> consumer) {
        Iterator<Jt808SessionEventListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            try {
                consumer.accept(it.next());
            } catch (Throwable th) {
                log.error("An error occurred while invoke Jt808SessionManagerEventListener", th);
            }
        }
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public Jt808Session removeBySessionId(String str) {
        this.lock.writeLock().lock();
        try {
            String remove = this.sessionIdTerminalIdMapping.remove(str);
            if (remove == null) {
                this.lock.writeLock().unlock();
                return null;
            }
            Jt808Session remove2 = this.sessionMap.remove(remove);
            invokeListeners(jt808SessionEventListener -> {
                jt808SessionEventListener.onSessionRemove(remove2);
            });
            this.lock.writeLock().unlock();
            return remove2;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public void removeBySessionIdAndClose(String str, SessionCloseReason sessionCloseReason) {
        this.lock.writeLock().lock();
        try {
            Jt808Session removeBySessionId = removeBySessionId(str);
            if (removeBySessionId != null) {
                invokeListeners(jt808SessionEventListener -> {
                    jt808SessionEventListener.onSessionClose(removeBySessionId, sessionCloseReason);
                });
                removeBySessionId.channel().close();
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public Optional<Jt808Session> findByTerminalId(String str, boolean z) {
        Jt808Session jt808Session = this.sessionMap.get(str);
        if (jt808Session == null) {
            return Optional.empty();
        }
        if (z) {
            jt808Session.lastCommunicateTimestamp(System.currentTimeMillis());
        }
        return !checkStatus(jt808Session) ? Optional.empty() : Optional.of(jt808Session);
    }

    private boolean checkStatus(Jt808Session jt808Session) {
        if (jt808Session.channel().isActive()) {
            return true;
        }
        if (log.isDebugEnabled()) {
            log.debug("Remove session [{}], because channel !isActive() ", jt808Session.terminalId());
        }
        removeBySessionIdAndClose(jt808Session.id(), DefaultSessionCloseReason.CHANNEL_INACTIVE);
        return false;
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public synchronized Jt808SessionManager addListener(Jt808SessionEventListener jt808SessionEventListener) {
        this.listeners.add(jt808SessionEventListener);
        return this;
    }

    @Override // io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager
    public List<Jt808SessionEventListener> getListeners() {
        return this.listeners;
    }
}
