package io.github.hylexus.jt.jt1078.support.codec.impl.collector;

import io.github.hylexus.jt.jt1078.spec.Jt1078Request;
import io.github.hylexus.jt.jt1078.spec.Jt1078Subscriber;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberCreator;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberDescriptor;
import io.github.hylexus.jt.jt1078.spec.Jt1078Subscription;
import io.github.hylexus.jt.jt1078.spec.exception.Jt1078SubscriberCloseException;
import io.github.hylexus.jt.jt1078.spec.impl.subscription.DefaultJt1078Subscriber;
import io.github.hylexus.jt.jt1078.spec.impl.subscription.DefaultJt1078SubscriberDescriptor;
import io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector;
import io.github.hylexus.jt.jt1078.support.codec.impl.collector.InternalSubscriber;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:io/github/hylexus/jt/jt1078/support/codec/impl/collector/AbstractChannelCollector.class */
public abstract class AbstractChannelCollector<PT extends Jt1078Subscription, IST extends InternalSubscriber<PT>> implements Jt1078ChannelCollector<PT> {
    private static final Logger log = LoggerFactory.getLogger(AbstractChannelCollector.class);
    protected final ConcurrentMap<String, IST> subscriberMapping = new ConcurrentHashMap();

    @Override // io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public void collect(Jt1078Request jt1078Request) {
        doCollect(jt1078Request);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void doCollect(Jt1078Request jt1078Request);

    /* JADX INFO: Access modifiers changed from: protected */
    public void forEachSubscriber(Consumer<IST> consumer) {
        Iterator<IST> it = this.subscriberMapping.values().iterator();
        while (it.hasNext()) {
            consumer.accept(it.next());
        }
    }

    protected abstract IST createSubscribe(String str, Jt1078SubscriberCreator jt1078SubscriberCreator, FluxSink<PT> fluxSink);

    @Override // io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public Jt1078Subscriber<PT> doSubscribe(Jt1078SubscriberCreator jt1078SubscriberCreator) {
        String ofUuid = Jt1078Subscriber.SubscriberKey.ofUuid(jt1078SubscriberCreator.sim(), jt1078SubscriberCreator.channelNumber());
        return new DefaultJt1078Subscriber(ofUuid, Flux.create(fluxSink -> {
            log.info("new subscriber created with id: {}", ofUuid);
            IST createSubscribe = createSubscribe(ofUuid, jt1078SubscriberCreator, fluxSink);
            synchronized (this.subscriberMapping) {
                this.subscriberMapping.put(ofUuid, createSubscribe);
            }
        }).timeout(jt1078SubscriberCreator.timeout()).doFinally(signalType -> {
            log.info("Subscriber {} removed", ofUuid);
            synchronized (this.subscriberMapping) {
                this.subscriberMapping.remove(ofUuid);
            }
        }));
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public void unsubscribe(String str, @Nullable Jt1078SubscriberCloseException jt1078SubscriberCloseException) {
        IST ist = this.subscriberMapping.get(str);
        if (ist != null) {
            if (jt1078SubscriberCloseException == null) {
                ist.sink().complete();
            } else {
                ist.sink().error(jt1078SubscriberCloseException);
            }
        }
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public void unsubscribe(@Nullable Jt1078SubscriberCloseException jt1078SubscriberCloseException) {
        this.subscriberMapping.values().forEach(internalSubscriber -> {
            try {
                if (jt1078SubscriberCloseException == null) {
                    internalSubscriber.sink().complete();
                } else {
                    internalSubscriber.sink().error(jt1078SubscriberCloseException);
                }
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
            }
        });
    }

    @Override // io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector
    public Stream<Jt1078SubscriberDescriptor> list() {
        return this.subscriberMapping.values().stream().map(internalSubscriber -> {
            return new DefaultJt1078SubscriberDescriptor(internalSubscriber.id(), internalSubscriber.sim(), internalSubscriber.channel(), internalSubscriber.createdAt(), internalSubscriber.desc(), internalSubscriber.metadata());
        });
    }
}
