package de.quantummaid.eventmaid.internal.pipe;

import de.quantummaid.eventmaid.exceptions.AlreadyClosedException;
import de.quantummaid.eventmaid.internal.exceptions.BubbleUpWrappedException;
import de.quantummaid.eventmaid.internal.pipe.exceptions.PipeErrorHandler;
import de.quantummaid.eventmaid.internal.pipe.statistics.PipeStatistics;
import de.quantummaid.eventmaid.internal.pipe.statistics.PipeStatisticsCollector;
import de.quantummaid.eventmaid.internal.pipe.transport.TransportMechanism;
import de.quantummaid.eventmaid.subscribing.ConsumerSubscriber;
import de.quantummaid.eventmaid.subscribing.Subscriber;
import de.quantummaid.eventmaid.subscribing.SubscriptionId;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/* loaded from: input_file:de/quantummaid/eventmaid/internal/pipe/PipeImpl.class */
public final class PipeImpl<T> implements Pipe<T> {
    private final TransportMechanism<T> transportMechanism;
    private final PipeStatisticsCollector statisticsCollector;
    private final List<Subscriber<T>> subscribers;
    private final PipeErrorHandler<T> errorHandler;
    private volatile boolean closedAlreadyCalled;

    public PipeImpl(TransportMechanism<T> transportMechanism, PipeStatisticsCollector pipeStatisticsCollector, List<Subscriber<T>> list, PipeErrorHandler<T> pipeErrorHandler) {
        this.transportMechanism = transportMechanism;
        this.statisticsCollector = pipeStatisticsCollector;
        this.subscribers = list;
        this.errorHandler = pipeErrorHandler;
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public void send(T t) {
        if (this.closedAlreadyCalled) {
            throw new AlreadyClosedException();
        }
        transport(t);
    }

    private void transport(T t) {
        try {
            this.transportMechanism.transport(t);
        } catch (BubbleUpWrappedException e) {
            this.errorHandler.handleBubbledUpException(t, e);
        }
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public SubscriptionId subscribe(Subscriber<T> subscriber) {
        if (this.closedAlreadyCalled) {
            throw new AlreadyClosedException();
        }
        this.subscribers.add(subscriber);
        return subscriber.getSubscriptionId();
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public SubscriptionId subscribe(Consumer<T> consumer) {
        return subscribe(ConsumerSubscriber.consumerSubscriber(consumer));
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public void unsubscribe(SubscriptionId subscriptionId) {
        if (this.closedAlreadyCalled) {
            throw new AlreadyClosedException();
        }
        this.subscribers.removeIf(subscriber -> {
            return subscriber.getSubscriptionId().equals(subscriptionId);
        });
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public PipeStatusInformation<T> getStatusInformation() {
        return new PipeStatusInformation<T>() { // from class: de.quantummaid.eventmaid.internal.pipe.PipeImpl.1
            @Override // de.quantummaid.eventmaid.internal.pipe.PipeStatusInformation
            public PipeStatistics getCurrentMessageStatistics() {
                return PipeImpl.this.statisticsCollector.getCurrentStatistics();
            }

            @Override // de.quantummaid.eventmaid.internal.pipe.PipeStatusInformation
            public List<Subscriber<T>> getAllSubscribers() {
                return PipeImpl.this.subscribers;
            }
        };
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public void close(boolean z) {
        if (this.closedAlreadyCalled) {
            return;
        }
        this.closedAlreadyCalled = true;
        this.transportMechanism.close(z);
    }

    @Override // de.quantummaid.eventmaid.internal.autoclosable.NoErrorAutoClosable, java.lang.AutoCloseable
    public void close() {
        close(true);
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public boolean isClosed() {
        if (this.closedAlreadyCalled) {
            return this.transportMechanism.isShutdown();
        }
        return false;
    }

    @Override // de.quantummaid.eventmaid.internal.pipe.Pipe
    public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException {
        if (this.closedAlreadyCalled) {
            return this.transportMechanism.awaitTermination(i, timeUnit);
        }
        return false;
    }
}
