package io.github.hylexus.jt.dashboard.client.controller.jt1078.reactive.subscriber;

import io.github.hylexus.jt.dashboard.client.controller.jt1078.model.converter.DashboardClientModelConverter;
import io.github.hylexus.jt.dashboard.common.consts.DashboardJt1078SessionCloseReason;
import io.github.hylexus.jt.dashboard.common.model.dto.jt1078.DashboardVideoStreamSubscriberDto;
import io.github.hylexus.jt.jt1078.spec.Jt1078Publisher;
import io.github.hylexus.jt.jt1078.spec.Jt1078SessionManager;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberCreator;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberManager;
import io.github.hylexus.jt.jt1078.spec.exception.Jt1078SessionDestroyException;
import io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector;
import io.github.hylexus.jt.jt1078.support.extension.audio.Jt1078AudioFormatConverter;
import io.github.hylexus.jt.jt1078.support.extension.audio.impl.BuiltinAudioFormatOptions;
import io.github.hylexus.jt.utils.HexStringUtils;
import io.github.hylexus.jt.utils.JtWebUtils;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.util.UriTemplate;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/github/hylexus/jt/dashboard/client/controller/jt1078/reactive/subscriber/BuiltinFlvStreamSubscriberWebSocket.class */
public class BuiltinFlvStreamSubscriberWebSocket implements WebSocketHandler {
    private static final Logger log = LoggerFactory.getLogger(BuiltinFlvStreamSubscriberWebSocket.class);
    public static final String PATH_PATTERN = "/api/dashboard-client/jt1078/video-stream/websocket/flv/{sim}/{channel}";
    private final Jt1078Publisher jt1078Publisher;
    private final Jt1078SubscriberManager jt1078SubscriberManager;
    private final UriTemplate uriTemplate = new UriTemplate("/api/dashboard-client/jt1078/video-stream/websocket/flv/{sim}/{channel}");
    private final Scheduler scheduler;
    private final Jt1078SessionManager sessionManager;

    public BuiltinFlvStreamSubscriberWebSocket(Jt1078Publisher jt1078Publisher, Jt1078SubscriberManager jt1078SubscriberManager, Scheduler scheduler, Jt1078SessionManager jt1078SessionManager) {
        this.jt1078Publisher = jt1078Publisher;
        this.jt1078SubscriberManager = jt1078SubscriberManager;
        this.scheduler = scheduler;
        this.sessionManager = jt1078SessionManager;
    }

    @NonNull
    public Mono<Void> handle(@NonNull WebSocketSession webSocketSession) {
        DashboardVideoStreamSubscriberDto convert = DashboardClientModelConverter.convert(webSocketSession, this.uriTemplate);
        log.info("New FLV publisher created via WebSocket: {}", convert);
        return Mono.zip(webSocketSession.receive().doOnNext(webSocketMessage -> {
            if (log.isDebugEnabled()) {
                log.debug("Receive webSocket msg, webSocketSessionId={}, payload={}", webSocketSession.getId(), webSocketMessage.getPayloadAsText());
            }
        }).then(), subscribeFlv(webSocketSession, convert)).doFinally(signalType -> {
            if (convert.isAutoCloseJt1078SessionOnClientClosed() || this.jt1078SubscriberManager.list(convert.getSim(), convert.getChannel()).findAny().isEmpty()) {
                this.sessionManager.removeBySimAndChannelAndThenClose(convert.getSim(), convert.getChannel(), DashboardJt1078SessionCloseReason.CLOSED_BY_WEB_SOCKET);
                log.info("Jt1078SessionClosed By WebSocket: {}", convert);
            }
        }).then();
    }

    private Mono<Void> subscribeFlv(WebSocketSession webSocketSession, DashboardVideoStreamSubscriberDto dashboardVideoStreamSubscriberDto) {
        HttpHeaders headers = webSocketSession.getHandshakeInfo().getHeaders();
        Objects.requireNonNull(headers);
        return this.jt1078Publisher.subscribe(Jt1078ChannelCollector.H264_TO_FLV_COLLECTOR, Jt1078SubscriberCreator.builder().sim(dashboardVideoStreamSubscriberDto.getSim()).channelNumber(dashboardVideoStreamSubscriberDto.getChannel()).timeout(Duration.ofSeconds(dashboardVideoStreamSubscriberDto.getTimeout())).sourceAudioOptions((Jt1078AudioFormatConverter.AudioFormatOptions) BuiltinAudioFormatOptions.parseFrom(dashboardVideoStreamSubscriberDto.getSourceAudioHints()).orElse(null)).metadata(Map.of("createdBy", getClass().getSimpleName(), "clientIp", (String) JtWebUtils.getClientIp(headers::getFirst).or(() -> {
            return Optional.ofNullable(webSocketSession.getHandshakeInfo().getRemoteAddress()).map((v0) -> {
                return v0.getHostName();
            });
        }).orElse(""), "clientUri", webSocketSession.getHandshakeInfo().getUri().toString())).build()).publishOn(this.scheduler).onErrorComplete(Jt1078SessionDestroyException.class).onErrorComplete(TimeoutException.class).doOnError(Jt1078SessionDestroyException.class, jt1078SessionDestroyException -> {
            log.info("取消订阅(Session销毁)");
        }).doOnError(TimeoutException.class, timeoutException -> {
            log.info("取消订阅(超时, {} 秒)", Integer.valueOf(dashboardVideoStreamSubscriberDto.getTimeout()));
        }).doOnError(Throwable.class, th -> {
            log.error(th.getMessage(), th);
        }).flatMap(byteArrayJt1078Subscription -> {
            byte[] payload = byteArrayJt1078Subscription.payload();
            if (log.isDebugEnabled()) {
                log.debug("FLV WebSocket outbound: {}", HexStringUtils.bytes2HexString(payload));
            }
            return Mono.just(webSocketSession.binaryMessage(dataBufferFactory -> {
                return dataBufferFactory.wrap(payload);
            }));
        }).flatMap(webSocketMessage -> {
            return webSocketSession.send(Mono.just(webSocketMessage));
        }).then();
    }
}
