package io.github.hylexus.jt.jt1078.spec.impl.subscription;

import io.github.hylexus.jt.jt1078.spec.Jt1078PublisherInternal;
import io.github.hylexus.jt.jt1078.spec.Jt1078Request;
import io.github.hylexus.jt.jt1078.spec.Jt1078Subscriber;
import io.github.hylexus.jt.jt1078.spec.Jt1078SubscriberDescriptor;
import io.github.hylexus.jt.jt1078.spec.Jt1078Subscription;
import io.github.hylexus.jt.jt1078.spec.Jt1078TerminalIdConverter;
import io.github.hylexus.jt.jt1078.spec.exception.Jt1078SubscriberCloseException;
import io.github.hylexus.jt.jt1078.support.codec.Jt1078ChannelCollector;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.springframework.lang.Nullable;

/* loaded from: input_file:io/github/hylexus/jt/jt1078/spec/impl/subscription/DefaultJt1078Publisher.class */
public class DefaultJt1078Publisher implements Jt1078PublisherInternal {
    private final ReentrantReadWriteLock.ReadLock readLock;
    private final ReentrantReadWriteLock.WriteLock writeLock;
    private final Map<Jt1078Subscriber.SubscriberKey, Map<Class<? extends Jt1078ChannelCollector<? extends Jt1078Subscription>>, Jt1078ChannelCollector<? extends Jt1078Subscription>>> collectors = new ConcurrentHashMap();
    private final Jt1078ChannelCollectorFactory collectorFactory;
    private final Jt1078TerminalIdConverter jt1078TerminalIdConverter;

    public DefaultJt1078Publisher(Jt1078ChannelCollectorFactory jt1078ChannelCollectorFactory, Jt1078TerminalIdConverter jt1078TerminalIdConverter) {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        this.readLock = reentrantReadWriteLock.readLock();
        this.writeLock = reentrantReadWriteLock.writeLock();
        this.collectorFactory = jt1078ChannelCollectorFactory;
        this.jt1078TerminalIdConverter = jt1078TerminalIdConverter;
    }

    @Override // io.github.hylexus.jt.jt1078.spec.Jt1078Publisher
    public Jt1078TerminalIdConverter terminalIdConverter() {
        return this.jt1078TerminalIdConverter;
    }

    @Override // io.github.hylexus.jt.jt1078.spec.Jt1078Publisher
    public <S extends Jt1078Subscription> Jt1078Subscriber<S> doSubscribe(Class<? extends Jt1078ChannelCollector<S>> cls, String str, short s, Duration duration) {
        return (Jt1078Subscriber<S>) getOrCreate(Jt1078Subscriber.SubscriberKey.of(str, s), cls).doSubscribe(str, s, duration);
    }

    private <S extends Jt1078Subscription> Jt1078ChannelCollector<? extends Jt1078Subscription> getOrCreate(Jt1078Subscriber.SubscriberKey subscriberKey, Class<? extends Jt1078ChannelCollector<S>> cls) {
        this.writeLock.lock();
        try {
            Jt1078ChannelCollector<? extends Jt1078Subscription> computeIfAbsent = this.collectors.computeIfAbsent(subscriberKey, subscriberKey2 -> {
                return new ConcurrentHashMap();
            }).computeIfAbsent(cls, cls2 -> {
                return this.collectorFactory.create(cls);
            });
            this.writeLock.unlock();
            return computeIfAbsent;
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    @Override // io.github.hylexus.jt.jt1078.spec.Jt1078Publisher
    public void unsubscribe(String str, @Nullable Jt1078SubscriberCloseException jt1078SubscriberCloseException) {
        doUnsubscribe(jt1078SubscriberCloseException, entry -> {
            Jt1078Subscriber.SubscriberKey subscriberKey = (Jt1078Subscriber.SubscriberKey) entry.getKey();
            return str.startsWith(Jt1078Subscriber.SubscriberKey.prefix(subscriberKey.getSim(), subscriberKey.getChannel()));
        });
    }

    @Override // io.github.hylexus.jt.jt1078.spec.Jt1078Publisher
    public void unsubscribeWithSim(String str, Jt1078SubscriberCloseException jt1078SubscriberCloseException) {
        doUnsubscribe(jt1078SubscriberCloseException, entry -> {
            return ((Jt1078Subscriber.SubscriberKey) entry.getKey()).getSim().equals(str);
        });
    }

    private void doUnsubscribe(Jt1078SubscriberCloseException jt1078SubscriberCloseException, Predicate<Map.Entry<Jt1078Subscriber.SubscriberKey, Map<Class<? extends Jt1078ChannelCollector<? extends Jt1078Subscription>>, Jt1078ChannelCollector<? extends Jt1078Subscription>>>> predicate) {
        this.writeLock.lock();
        try {
            Iterator<Map.Entry<Jt1078Subscriber.SubscriberKey, Map<Class<? extends Jt1078ChannelCollector<? extends Jt1078Subscription>>, Jt1078ChannelCollector<? extends Jt1078Subscription>>>> it = this.collectors.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Jt1078Subscriber.SubscriberKey, Map<Class<? extends Jt1078ChannelCollector<? extends Jt1078Subscription>>, Jt1078ChannelCollector<? extends Jt1078Subscription>>> next = it.next();
                if (predicate.test(next)) {
                    it.remove();
                    next.getValue().values().forEach(jt1078ChannelCollector -> {
                        jt1078ChannelCollector.unsubscribe(jt1078SubscriberCloseException);
                    });
                }
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    @Override // io.github.hylexus.jt.jt1078.spec.Jt1078PublisherInternal
    public void publish(Jt1078Request jt1078Request) {
        Map<Class<? extends Jt1078ChannelCollector<? extends Jt1078Subscription>>, Jt1078ChannelCollector<? extends Jt1078Subscription>> collectorsForKey = getCollectorsForKey(Jt1078Subscriber.SubscriberKey.of(jt1078Request));
        if (collectorsForKey != null) {
            collectorsForKey.values().forEach(jt1078ChannelCollector -> {
                jt1078ChannelCollector.collect(jt1078Request);
            });
        }
    }

    private Map<Class<? extends Jt1078ChannelCollector<? extends Jt1078Subscription>>, Jt1078ChannelCollector<? extends Jt1078Subscription>> getCollectorsForKey(Jt1078Subscriber.SubscriberKey subscriberKey) {
        this.readLock.lock();
        try {
            return this.collectors.get(subscriberKey);
        } finally {
            this.readLock.unlock();
        }
    }

    @Override // io.github.hylexus.jt.jt1078.spec.Jt1078PublisherManager
    public Stream<Jt1078SubscriberDescriptor> list() {
        this.readLock.lock();
        try {
            return this.collectors.values().stream().flatMap(map -> {
                return map.values().stream().flatMap((v0) -> {
                    return v0.list();
                });
            });
        } finally {
            this.readLock.unlock();
        }
    }
}
