package io.github.hylexus.jt.dashboard.client.controller.jt1078.blocking.subscriber;

import io.github.hylexus.jt.dashboard.common.consts.DashboardJt1078SessionCloseReason;
import io.github.hylexus.jt.dashboard.common.model.dto.jt1078.DashboardVideoStreamSubscriberDto;
import io.github.hylexus.jt.jt1078.spec.Jt1078Publisher;
import io.github.hylexus.jt.jt1078.spec.Jt1078SessionManager;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberCreator;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberManager;
import io.github.hylexus.jt.jt1078.spec.exception.Jt1078SessionDestroyException;
import io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector;
import io.github.hylexus.jt.jt1078.support.extension.audio.Jt1078AudioFormatConverter;
import io.github.hylexus.jt.jt1078.support.extension.audio.impl.BuiltinAudioFormatOptions;
import io.github.hylexus.jt.utils.HexStringUtils;
import io.github.hylexus.jt.utils.JtWebUtils;
import io.github.hylexus.oaks.utils.Numbers;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.springframework.web.util.UriTemplate;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/github/hylexus/jt/dashboard/client/controller/jt1078/blocking/subscriber/BuiltinBlockingFlvStreamSubscriberWebSocket.class */
public class BuiltinBlockingFlvStreamSubscriberWebSocket extends AbstractWebSocketHandler {
    private static final Logger log = LoggerFactory.getLogger(BuiltinBlockingFlvStreamSubscriberWebSocket.class);
    public static final String PATH_PATTERN = "/api/dashboard-client/jt1078/video-stream/websocket/flv/{sim}/{channel}";
    private final Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap();
    private final UriTemplate uriTemplate = new UriTemplate("/api/dashboard-client/jt1078/video-stream/websocket/flv/{sim}/{channel}");
    private final Jt1078Publisher jt1078Publisher;
    private final Jt1078SubscriberManager jt1078SubscriberManager;
    private final Jt1078SessionManager sessionManager;
    private final Scheduler scheduler;

    public BuiltinBlockingFlvStreamSubscriberWebSocket(Jt1078Publisher jt1078Publisher, Jt1078SubscriberManager jt1078SubscriberManager, Scheduler scheduler, Jt1078SessionManager jt1078SessionManager) {
        this.jt1078Publisher = jt1078Publisher;
        this.jt1078SubscriberManager = jt1078SubscriberManager;
        this.sessionManager = jt1078SessionManager;
        this.scheduler = scheduler;
    }

    public void afterConnectionEstablished(@NonNull WebSocketSession webSocketSession) throws Exception {
        DashboardVideoStreamSubscriberDto parseParam = parseParam(webSocketSession, this.uriTemplate);
        log.info("New FLV publisher created via WebSocket: {}", parseParam);
        synchronized (this.sessionMap) {
            this.sessionMap.put(webSocketSession.getId(), webSocketSession);
        }
        HttpHeaders handshakeHeaders = webSocketSession.getHandshakeHeaders();
        Objects.requireNonNull(handshakeHeaders);
        this.jt1078Publisher.subscribe(Jt1078ChannelCollector.H264_TO_FLV_COLLECTOR, Jt1078SubscriberCreator.builder().sim(parseParam.getSim()).channelNumber(parseParam.getChannel()).timeout(Duration.ofSeconds(parseParam.getTimeout())).sourceAudioOptions((Jt1078AudioFormatConverter.AudioFormatOptions) BuiltinAudioFormatOptions.parseFrom(parseParam.getSourceAudioHints()).orElse(null)).metadata(Map.of("createdBy", getClass().getSimpleName(), "clientIp", (String) JtWebUtils.getClientIp(handshakeHeaders::getFirst).or(() -> {
            return Optional.ofNullable(webSocketSession.getRemoteAddress()).map((v0) -> {
                return v0.getHostName();
            });
        }).orElse(""), "clientUri", Optional.ofNullable(webSocketSession.getUri()).map((v0) -> {
            return v0.toString();
        }).orElse(""))).build()).publishOn(this.scheduler).onErrorComplete(Jt1078SessionDestroyException.class).onErrorComplete(TimeoutException.class).doOnError(Jt1078SessionDestroyException.class, jt1078SessionDestroyException -> {
            log.error("取消订阅(Session销毁)");
        }).doOnError(TimeoutException.class, timeoutException -> {
            log.error("取消订阅(超时, {} 秒)", Integer.valueOf(parseParam.getTimeout()));
        }).doOnError(Throwable.class, th -> {
            log.error(th.getMessage(), th);
        }).doOnNext(byteArrayJt1078Subscription -> {
            byte[] payload = byteArrayJt1078Subscription.payload();
            log.info("WebSocket outbound: {}", HexStringUtils.bytes2HexString(payload));
            try {
                webSocketSession.sendMessage(new BinaryMessage(payload));
            } catch (Throwable th2) {
                throw new RuntimeException(th2);
            }
        }).doFinally(signalType -> {
            if (parseParam.isAutoCloseJt1078SessionOnClientClosed() || this.jt1078SubscriberManager.list(parseParam.getSim(), parseParam.getChannel()).findAny().isEmpty()) {
                this.sessionManager.removeBySimAndChannelAndThenClose(parseParam.getSim(), parseParam.getChannel(), DashboardJt1078SessionCloseReason.CLOSED_BY_WEB_SOCKET);
                log.info("Jt1078SessionClosed By WebSocket: {}", parseParam);
            }
            synchronized (this.sessionMap) {
                WebSocketSession remove = this.sessionMap.remove(webSocketSession.getId());
                if (remove != null) {
                    try {
                        remove.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }).subscribe();
        log.info("session add : {}", webSocketSession);
    }

    public void afterConnectionClosed(WebSocketSession webSocketSession, @NonNull CloseStatus closeStatus) throws Exception {
        synchronized (this.sessionMap) {
            WebSocketSession remove = this.sessionMap.remove(webSocketSession.getId());
            if (remove != null) {
                remove.close();
            }
        }
        log.info("session {} closed with status  {}", webSocketSession, closeStatus);
    }

    public static DashboardVideoStreamSubscriberDto parseParam(WebSocketSession webSocketSession, UriTemplate uriTemplate) {
        URI uri = webSocketSession.getUri();
        Map match = uriTemplate.match(uri.getPath());
        String str = (String) match.getOrDefault("sim", "111111111111");
        short shortValue = ((Integer) Numbers.parseInteger((String) match.getOrDefault("channel", "3")).orElseThrow()).shortValue();
        String query = uri.getQuery();
        if (!StringUtils.hasText(query)) {
            return new DashboardVideoStreamSubscriberDto().setSim(str).setChannel(shortValue);
        }
        String[] split = query.split("&");
        HashMap hashMap = new HashMap();
        for (String str2 : split) {
            String[] split2 = str2.split("=");
            if (split2.length == 2) {
                hashMap.put(split2[0], split2[1]);
            }
        }
        return new DashboardVideoStreamSubscriberDto().setSim(str).setChannel(shortValue).setTimeout(((Integer) Numbers.parseInteger((String) hashMap.get("timeout")).orElse(10)).intValue()).setAutoCloseJt1078SessionOnClientClosed(parseBoolean(hashMap, "autoCloseJt1078SessionOnClientClosed", false)).setStreamType(((Integer) Numbers.parseInteger((String) hashMap.get("streamType")).orElse(0)).intValue()).setDataType(((Integer) Numbers.parseInteger((String) hashMap.get("dataType")).orElse(0)).intValue()).setSourceAudioHints((String) hashMap.get("sourceAudioHints"));
    }

    public static boolean parseBoolean(Map<String, String> map, String str, boolean z) {
        return ((Boolean) Optional.ofNullable(map.get(str)).map(str2 -> {
            try {
                return Boolean.valueOf(Boolean.parseBoolean(str2));
            } catch (Exception e) {
                return Boolean.valueOf(z);
            }
        }).orElse(Boolean.valueOf(z))).booleanValue();
    }
}
