package org.apache.camel.component.reactive.streams.engine;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants;
import org.apache.camel.component.reactive.streams.ReactiveStreamsEndpoint;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.component.reactive.streams.api.DispatchCallback;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/reactive/streams/engine/CamelPublisher.class */
public class CamelPublisher implements Publisher<Exchange>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(CamelPublisher.class);
    private final ExecutorService workerPool;
    private final String name;
    private final List<CamelSubscription> subscriptions = new CopyOnWriteArrayList();
    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
    private ReactiveStreamsProducer producer;

    public CamelPublisher(ExecutorService executorService, CamelContext camelContext, String str) {
        this.workerPool = executorService;
        this.backpressureStrategy = camelContext.getComponent(ReactiveStreamsConstants.SCHEME).getBackpressureStrategy();
        this.name = str;
    }

    public void subscribe(Subscriber<? super Exchange> subscriber) {
        Objects.requireNonNull(subscriber, "subscriber must not be null");
        CamelSubscription camelSubscription = new CamelSubscription(UUID.randomUUID().toString(), this.workerPool, this, this.name, this.backpressureStrategy, subscriber);
        this.subscriptions.add(camelSubscription);
        subscriber.onSubscribe(camelSubscription);
    }

    public void unsubscribe(CamelSubscription camelSubscription) {
        this.subscriptions.remove(camelSubscription);
    }

    public void publish(Exchange exchange) {
        LinkedList linkedList = new LinkedList(this.subscriptions);
        DispatchCallback<Exchange> callback = ReactiveStreamsHelper.getCallback(exchange);
        DispatchCallback<Exchange> dispatchCallback = callback;
        if (callback != null && linkedList.size() > 0) {
            AtomicInteger atomicInteger = new AtomicInteger(linkedList.size());
            AtomicReference atomicReference = new AtomicReference(null);
            dispatchCallback = ReactiveStreamsHelper.attachCallback(exchange, (exchange2, th) -> {
                atomicReference.compareAndSet(null, th);
                if (atomicInteger.decrementAndGet() == 0) {
                    callback.processed(exchange2, (Throwable) atomicReference.get());
                }
            });
        }
        if (linkedList.size() <= 0) {
            if (dispatchCallback != null) {
                dispatchCallback.processed(exchange, new IllegalStateException("The stream has no active subscriptions"));
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Exchange published to {} subscriptions for the stream {}: {}", new Object[]{Integer.valueOf(linkedList.size()), this.name, exchange});
            }
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                ((CamelSubscription) it.next()).publish(exchange);
            }
        }
    }

    public void attachProducer(ReactiveStreamsProducer reactiveStreamsProducer) {
        Objects.requireNonNull(reactiveStreamsProducer, "producer cannot be null, use the detach method");
        if (this.producer != null) {
            throw new IllegalStateException("A producer is already attached to the stream '" + this.name + "'");
        }
        this.producer = reactiveStreamsProducer;
        ReactiveStreamsEndpoint m7getEndpoint = reactiveStreamsProducer.m7getEndpoint();
        if (m7getEndpoint.getBackpressureStrategy() != null) {
            this.backpressureStrategy = m7getEndpoint.getBackpressureStrategy();
            Iterator<CamelSubscription> it = this.subscriptions.iterator();
            while (it.hasNext()) {
                it.next().setBackpressureStrategy(m7getEndpoint.getBackpressureStrategy());
            }
        }
    }

    public void detachProducer() {
        this.producer = null;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Iterator<CamelSubscription> it = this.subscriptions.iterator();
        while (it.hasNext()) {
            it.next().signalCompletion();
        }
        this.subscriptions.clear();
    }

    public List<CamelSubscription> getSubscriptions() {
        return this.subscriptions;
    }
}
