package io.github.hylexus.jt.jt1078.support.codec.impl.collector;

import io.github.hylexus.jt.jt1078.spec.Jt1078PayloadType;
import io.github.hylexus.jt.jt1078.spec.Jt1078Request;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberCreator;
import io.github.hylexus.jt.jt1078.spec.Jt1078Subscription;
import io.github.hylexus.jt.jt1078.spec.impl.request.DefaultJt1078PayloadType;
import io.github.hylexus.jt.jt1078.spec.impl.subscription.ByteArrayJt1078Subscription;
import io.github.hylexus.jt.jt1078.spec.impl.subscription.DefaultJt1078SubscriptionType;
import io.github.hylexus.jt.jt1078.support.extension.flv.FlvHeader;
import io.github.hylexus.jt.jt1078.support.extension.flv.impl.DefaultFlvEncoder;
import io.github.hylexus.jt.utils.ByteBufUtils;
import io.netty.buffer.ByteBuf;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:io/github/hylexus/jt/jt1078/support/codec/impl/collector/H264ToFlvJt1078ChannelCollector.class */
public class H264ToFlvJt1078ChannelCollector extends AbstractAsyncChannelCollector<ByteArrayJt1078Subscription, H264ToFlvSubscriber> {
    private final DefaultFlvEncoder flvEncoder;
    static final Set<Jt1078PayloadType> SUPPORTED_PAYLOAD_TYPES = Set.of(DefaultJt1078PayloadType.H264, DefaultJt1078PayloadType.ADPCMA, DefaultJt1078PayloadType.G_726, DefaultJt1078PayloadType.G_711A);
    private static final Map<Jt1078PayloadType, Boolean> WARNING_FLAGS = new HashMap();

    public H264ToFlvJt1078ChannelCollector(ThreadFactory threadFactory, Jt1078SubscriberCreator jt1078SubscriberCreator) {
        super(threadFactory);
        this.flvEncoder = new DefaultFlvEncoder(jt1078SubscriberCreator);
    }

    private boolean isSupported(Jt1078PayloadType jt1078PayloadType) {
        return SUPPORTED_PAYLOAD_TYPES.contains(jt1078PayloadType);
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.impl.collector.AbstractChannelCollector
    protected void doCollect(Jt1078Request jt1078Request) {
        ByteBuf lastIFrame;
        ByteBuf flvBasicFrame;
        Jt1078PayloadType payloadType = jt1078Request.header().payloadType();
        if (!isSupported(payloadType)) {
            doErrorLogIfNecessary(jt1078Request, payloadType);
            return;
        }
        Iterator<ByteBuf> it = this.flvEncoder.encode(jt1078Request).iterator();
        while (it.hasNext()) {
            ByteBuf next = it.next();
            try {
                ByteArrayJt1078Subscription forByteArray = Jt1078Subscription.forByteArray(DefaultJt1078SubscriptionType.FLV, ByteBufUtils.getBytes(next));
                for (H264ToFlvSubscriber h264ToFlvSubscriber : this.subscriberMapping.values()) {
                    FluxSink<ByteArrayJt1078Subscription> sink = h264ToFlvSubscriber.sink();
                    if (!h264ToFlvSubscriber.isFlvHeaderSent() && (flvBasicFrame = this.flvEncoder.getFlvBasicFrame()) != null) {
                        sink.next(Jt1078Subscription.forByteArray(DefaultJt1078SubscriptionType.FLV, FlvHeader.of(true, this.flvEncoder.isHasAudio()).toBytes(true)));
                        sink.next(Jt1078Subscription.forByteArray(DefaultJt1078SubscriptionType.FLV, flvBasicFrame));
                        h264ToFlvSubscriber.setFlvHeaderSent(true);
                    }
                    if (!h264ToFlvSubscriber.isLastIFrameSent() && (lastIFrame = this.flvEncoder.getLastIFrame()) != null) {
                        sink.next(Jt1078Subscription.forByteArray(DefaultJt1078SubscriptionType.FLV, lastIFrame));
                        h264ToFlvSubscriber.setLastIFrameSent(true);
                    }
                    if (!h264ToFlvSubscriber.isLastIFrameSent()) {
                        return;
                    }
                    if (!h264ToFlvSubscriber.isFlvHeaderSent()) {
                        next.release();
                        return;
                    }
                    sink.next(forByteArray);
                }
                next.release();
            } finally {
                next.release();
            }
        }
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.impl.collector.AbstractChannelCollector
    protected H264ToFlvSubscriber createSubscribe(String str, Jt1078SubscriberCreator jt1078SubscriberCreator, FluxSink<ByteArrayJt1078Subscription> fluxSink) {
        return new H264ToFlvSubscriber(str, jt1078SubscriberCreator.sim(), jt1078SubscriberCreator.channelNumber(), "H.264 --> FLV", LocalDateTime.now(), jt1078SubscriberCreator.metadata(), fluxSink);
    }

    private void doErrorLogIfNecessary(Jt1078Request jt1078Request, Jt1078PayloadType jt1078PayloadType) {
        Boolean bool = WARNING_FLAGS.get(jt1078PayloadType);
        if (bool == null || !bool.booleanValue()) {
            WARNING_FLAGS.put(jt1078PayloadType, true);
            this.log.error("Unsupported payloadType : {}. sim = {}, channelNumber = {}, dataType = {}, request = {}", new Object[]{jt1078PayloadType, jt1078Request.sim(), Short.valueOf(jt1078Request.channelNumber()), jt1078Request.header().dataType(), jt1078Request});
        }
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.impl.collector.AbstractAsyncChannelCollector, io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public void close() {
        try {
            super.close();
        } catch (Exception e) {
            this.log.error(e.getMessage(), e);
        }
        try {
            this.flvEncoder.close();
        } catch (Exception e2) {
            this.log.error(e2.getMessage(), e2);
        }
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.impl.collector.AbstractChannelCollector
    protected /* bridge */ /* synthetic */ InternalSubscriber createSubscribe(String str, Jt1078SubscriberCreator jt1078SubscriberCreator, FluxSink fluxSink) {
        return createSubscribe(str, jt1078SubscriberCreator, (FluxSink<ByteArrayJt1078Subscription>) fluxSink);
    }
}
