package io.github.hylexus.jt.dashboard.client.controller.jt1078.reactive.subscriber;

import io.github.hylexus.jt.dashboard.common.consts.DashboardJt1078SessionCloseReason;
import io.github.hylexus.jt.dashboard.common.model.dto.jt1078.DashboardVideoStreamSubscriberDto;
import io.github.hylexus.jt.jt1078.spec.Jt1078Publisher;
import io.github.hylexus.jt.jt1078.spec.Jt1078SessionManager;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberCreator;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberManager;
import io.github.hylexus.jt.jt1078.spec.exception.Jt1078SessionDestroyException;
import io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector;
import io.github.hylexus.jt.utils.FormatUtils;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

@RequestMapping({"/api/dashboard-client/jt1078/video-stream/http/flv"})
@Controller
/* loaded from: input_file:io/github/hylexus/jt/dashboard/client/controller/jt1078/reactive/subscriber/BuiltinFlvStreamSubscriberHttp.class */
public class BuiltinFlvStreamSubscriberHttp {
    private static final Logger log = LoggerFactory.getLogger(BuiltinFlvStreamSubscriberHttp.class);
    private final Jt1078SessionManager sessionManager;
    private final Jt1078SubscriberManager jt1078SubscriberManager;
    private final Jt1078Publisher publisher;
    private final Scheduler scheduler;

    public BuiltinFlvStreamSubscriberHttp(Jt1078SessionManager jt1078SessionManager, Jt1078SubscriberManager jt1078SubscriberManager, Jt1078Publisher jt1078Publisher, Scheduler scheduler) {
        this.sessionManager = jt1078SessionManager;
        this.jt1078SubscriberManager = jt1078SubscriberManager;
        this.publisher = jt1078Publisher;
        this.scheduler = scheduler;
    }

    @RequestMapping({"/{sim}/{channel}"})
    public ResponseEntity<Flux<byte[]>> handle(ServerWebExchange serverWebExchange, DashboardVideoStreamSubscriberDto dashboardVideoStreamSubscriberDto) {
        serverWebExchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);
        return ResponseEntity.ok().body(subscribeFlvStream(dashboardVideoStreamSubscriberDto, serverWebExchange));
    }

    private Flux<byte[]> subscribeFlvStream(DashboardVideoStreamSubscriberDto dashboardVideoStreamSubscriberDto, ServerWebExchange serverWebExchange) {
        int timeout = dashboardVideoStreamSubscriberDto.getTimeout();
        return this.publisher.subscribe(Jt1078ChannelCollector.H264_TO_FLV_COLLECTOR, Jt1078SubscriberCreator.builder().sim(dashboardVideoStreamSubscriberDto.getSim()).channelNumber(dashboardVideoStreamSubscriberDto.getChannel()).timeout(Duration.ofSeconds(dashboardVideoStreamSubscriberDto.getTimeout())).metadata(Map.of("createdBy", getClass().getSimpleName(), "clientUri", serverWebExchange.getRequest().getURI().toString())).build()).publishOn(this.scheduler).onErrorComplete(Jt1078SessionDestroyException.class).onErrorComplete(TimeoutException.class).doOnError(Jt1078SessionDestroyException.class, jt1078SessionDestroyException -> {
            log.error("取消订阅(Session销毁)");
        }).doOnError(TimeoutException.class, timeoutException -> {
            log.error("取消订阅(超时, {} 秒)", Integer.valueOf(timeout));
        }).doOnError(Throwable.class, th -> {
            log.error(th.getMessage(), th);
        }).doOnNext(byteArrayJt1078Subscription -> {
            byte[] payload = byteArrayJt1078Subscription.payload();
            if (log.isDebugEnabled()) {
                log.debug("Http outbound {}", FormatUtils.toHexString(payload));
            }
        }).map((v0) -> {
            return v0.payload();
        }).doFinally(signalType -> {
            log.info("Http outbound complete with signal: {}", signalType);
            if (dashboardVideoStreamSubscriberDto.isAutoCloseJt1078SessionOnClientClosed() || this.jt1078SubscriberManager.list(dashboardVideoStreamSubscriberDto.getSim(), dashboardVideoStreamSubscriberDto.getChannel()).findAny().isEmpty()) {
                this.sessionManager.removeBySimAndChannelAndThenClose(dashboardVideoStreamSubscriberDto.getSim(), dashboardVideoStreamSubscriberDto.getChannel(), DashboardJt1078SessionCloseReason.CLOSED_BY_HTTP);
                log.info("Jt1078SessionClosed By HttpStream: {}", dashboardVideoStreamSubscriberDto);
            }
        });
    }
}
