package io.github.hylexus.xtream.codec.server.reactive.spec.event.builtin;

import io.github.hylexus.xtream.codec.server.reactive.spec.XtreamSchedulerRegistry;
import io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEvent;
import io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher;
import io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventSubscriberInfo;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/github/hylexus/xtream/codec/server/reactive/spec/event/builtin/DefaultXtreamEventPublisher.class */
public class DefaultXtreamEventPublisher implements XtreamEventPublisher {
    private static final Logger log = LoggerFactory.getLogger(DefaultXtreamEventPublisher.class);
    private final XtreamSchedulerRegistry schedulerRegistry;
    private final ConcurrentMap<String, XtreamEventSubscriberInfo> subscribers = new ConcurrentHashMap();
    private final ConcurrentMap<Integer, Integer> interestedEventTypes = new ConcurrentHashMap();
    private final Sinks.Many<XtreamEvent> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
    protected Predicate<XtreamEvent.XtreamEventType> defaultPredicate = xtreamEventType -> {
        if (this.subscribers.isEmpty()) {
            return false;
        }
        int code = xtreamEventType.code();
        if (this.interestedEventTypes.containsKey(Integer.valueOf(XtreamEvent.DefaultXtreamEventType.ALL.code()))) {
            return true;
        }
        Integer num = this.interestedEventTypes.get(Integer.valueOf(code));
        return num != null && num.intValue() > 0;
    };

    public DefaultXtreamEventPublisher(XtreamSchedulerRegistry xtreamSchedulerRegistry) {
        this.schedulerRegistry = xtreamSchedulerRegistry;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher
    public void publish(XtreamEvent xtreamEvent) {
        Sinks.EmitResult tryEmitNext = this.sink.tryEmitNext(xtreamEvent);
        if (tryEmitNext.isFailure()) {
            log.error("Failed to publish event {} with result {}", xtreamEvent, tryEmitNext);
        }
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher
    public Predicate<XtreamEvent.XtreamEventType> eventPredicate() {
        return this.defaultPredicate;
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher
    public Flux<XtreamEvent> subscribe(XtreamEventSubscriberInfo xtreamEventSubscriberInfo) {
        Set<Integer> interestedEventsCode = xtreamEventSubscriberInfo.interestedEventsCode();
        if (interestedEventsCode.isEmpty()) {
            log.error("Interested events code cannot be empty for subscriber: {}", xtreamEventSubscriberInfo);
            return Flux.error(new IllegalArgumentException("Interested events code cannot be empty"));
        }
        if (this.subscribers.putIfAbsent(xtreamEventSubscriberInfo.id(), xtreamEventSubscriberInfo) != null) {
            log.error("Subscriber already exists: {}", xtreamEventSubscriberInfo);
            return Flux.error(new IllegalStateException("Subscriber already exists: " + String.valueOf(xtreamEventSubscriberInfo)));
        }
        synchronized (this.interestedEventTypes) {
            Iterator<Integer> it = interestedEventsCode.iterator();
            while (it.hasNext()) {
                this.interestedEventTypes.compute(it.next(), (num, num2) -> {
                    return Integer.valueOf(num2 == null ? 1 : num2.intValue() + 1);
                });
            }
        }
        return this.sink.asFlux().filter(xtreamEvent -> {
            return interestedEventsCode.contains(Integer.valueOf(XtreamEvent.XtreamEventType.ALL.code())) || interestedEventsCode.contains(Integer.valueOf(xtreamEvent.type().code()));
        }).publishOn(this.schedulerRegistry.eventPublisherScheduler()).doFinally(signalType -> {
            unsubscribe(xtreamEventSubscriberInfo);
        });
    }

    protected void unsubscribe(XtreamEventSubscriberInfo xtreamEventSubscriberInfo) {
        this.subscribers.remove(xtreamEventSubscriberInfo.id());
        synchronized (this.interestedEventTypes) {
            Iterator<Integer> it = xtreamEventSubscriberInfo.interestedEventsCode().iterator();
            while (it.hasNext()) {
                this.interestedEventTypes.compute(Integer.valueOf(it.next().intValue()), (num, num2) -> {
                    if (num2 == null || num2.intValue() <= 1) {
                        return null;
                    }
                    return Integer.valueOf(num2.intValue() - 1);
                });
            }
        }
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher
    public void shutdown() {
        this.sink.tryEmitComplete();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher
    public Stream<XtreamEventSubscriberInfo> subscriberView() {
        return this.subscribers.values().stream();
    }

    @Override // io.github.hylexus.xtream.codec.server.reactive.spec.event.XtreamEventPublisher
    public int subscriberCount() {
        return this.subscribers.size();
    }
}
