package de.quantummaid.messagemaid.messageBus;

import de.quantummaid.messagemaid.channel.Channel;
import de.quantummaid.messagemaid.filtering.Filter;
import de.quantummaid.messagemaid.filtering.FilterActions;
import de.quantummaid.messagemaid.identification.CorrelationId;
import de.quantummaid.messagemaid.identification.MessageId;
import de.quantummaid.messagemaid.internal.exceptions.BubbleUpWrappedException;
import de.quantummaid.messagemaid.messageBus.exception.MessageBusExceptionListener;
import de.quantummaid.messagemaid.messageBus.internal.MessageBusStatusInformationAdapter;
import de.quantummaid.messagemaid.messageBus.internal.brokering.MessageBusBrokerStrategy;
import de.quantummaid.messagemaid.messageBus.internal.correlationIds.CorrelationBasedSubscriptions;
import de.quantummaid.messagemaid.messageBus.internal.exception.ExceptionListenerHandler;
import de.quantummaid.messagemaid.messageBus.internal.statistics.ChannelBasedMessageBusStatisticsCollector;
import de.quantummaid.messagemaid.processingContext.EventType;
import de.quantummaid.messagemaid.processingContext.ProcessingContext;
import de.quantummaid.messagemaid.subscribing.ConsumerSubscriber;
import de.quantummaid.messagemaid.subscribing.Subscriber;
import de.quantummaid.messagemaid.subscribing.SubscriptionId;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/* loaded from: input_file:de/quantummaid/messagemaid/messageBus/MessageBusImpl.class */
final class MessageBusImpl implements MessageBus {
    private final Channel<Object> acceptingChannel;
    private final MessageBusBrokerStrategy brokerStrategy;
    private final CorrelationBasedSubscriptions correlationBasedSubscriptions;
    private final ExceptionListenerHandler exceptionListenerHandler;
    private MessageBusStatusInformationAdapter statusInformationAdapter;

    /* loaded from: input_file:de/quantummaid/messagemaid/messageBus/MessageBusImpl$FilterAdapter.class */
    private static final class FilterAdapter implements Filter<ProcessingContext<Object>> {
        private final Filter<Object> delegate;

        @Override // de.quantummaid.messagemaid.filtering.Filter
        public void apply(final ProcessingContext<Object> processingContext, final FilterActions<ProcessingContext<Object>> filterActions) {
            final Object payload = processingContext.getPayload();
            this.delegate.apply(payload, new FilterActions<Object>() { // from class: de.quantummaid.messagemaid.messageBus.MessageBusImpl.FilterAdapter.1
                @Override // de.quantummaid.messagemaid.filtering.FilterActions
                public void block(Object obj) {
                    if (payload != obj) {
                        processingContext.setPayload(obj);
                    }
                    filterActions.block(processingContext);
                }

                @Override // de.quantummaid.messagemaid.filtering.FilterActions
                public void pass(Object obj) {
                    if (payload != obj) {
                        processingContext.setPayload(obj);
                    }
                    filterActions.pass(processingContext);
                }
            });
        }

        private FilterAdapter(Filter<Object> filter) {
            this.delegate = filter;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageBusImpl(Channel<Object> channel, MessageBusBrokerStrategy messageBusBrokerStrategy, CorrelationBasedSubscriptions correlationBasedSubscriptions, ExceptionListenerHandler exceptionListenerHandler) {
        this.acceptingChannel = channel;
        this.brokerStrategy = messageBusBrokerStrategy;
        this.correlationBasedSubscriptions = correlationBasedSubscriptions;
        this.exceptionListenerHandler = exceptionListenerHandler;
        this.statusInformationAdapter = MessageBusStatusInformationAdapter.statusInformationAdapter(ChannelBasedMessageBusStatisticsCollector.channelBasedMessageBusStatisticsCollector(channel), messageBusBrokerStrategy, exceptionListenerHandler);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public MessageId send(EventType eventType, Object obj) {
        return send(ProcessingContext.processingContext(eventType, obj));
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public MessageId send(EventType eventType, Object obj, CorrelationId correlationId) {
        return send(ProcessingContext.processingContext(eventType, obj, correlationId));
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public MessageId send(ProcessingContext<Object> processingContext) {
        try {
            return this.acceptingChannel.send(processingContext);
        } catch (BubbleUpWrappedException e) {
            throw ((RuntimeException) e.getCause());
        }
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId subscribe(EventType eventType, Consumer<Object> consumer) {
        return subscribe(eventType, ConsumerSubscriber.consumerSubscriber(consumer));
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId subscribe(EventType eventType, Subscriber<Object> subscriber) {
        this.brokerStrategy.addSubscriber(eventType, subscriber);
        return subscriber.getSubscriptionId();
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId subscribe(CorrelationId correlationId, Consumer<ProcessingContext<Object>> consumer) {
        return subscribe(correlationId, ConsumerSubscriber.consumerSubscriber(consumer));
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId subscribe(CorrelationId correlationId, Subscriber<ProcessingContext<Object>> subscriber) {
        return this.correlationBasedSubscriptions.addCorrelationBasedSubscriber(correlationId, subscriber);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId subscribeRaw(EventType eventType, Consumer<ProcessingContext<Object>> consumer) {
        return subscribeRaw(eventType, ConsumerSubscriber.consumerSubscriber(consumer));
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId subscribeRaw(EventType eventType, Subscriber<ProcessingContext<Object>> subscriber) {
        this.brokerStrategy.addRawSubscriber(eventType, subscriber);
        return subscriber.getSubscriptionId();
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void unsubcribe(SubscriptionId subscriptionId) {
        this.brokerStrategy.removeSubscriber(subscriptionId);
        this.correlationBasedSubscriptions.unsubscribe(subscriptionId);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void add(Filter<Object> filter) {
        this.acceptingChannel.addProcessFilter(new FilterAdapter(filter));
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void add(Filter<Object> filter, int i) {
        this.acceptingChannel.addProcessFilter(new FilterAdapter(filter), i);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void addRaw(Filter<ProcessingContext<Object>> filter) {
        this.acceptingChannel.addProcessFilter(filter);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void addRaw(Filter<ProcessingContext<Object>> filter, int i) {
        this.acceptingChannel.addProcessFilter(filter, i);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public List<Filter<Object>> getFilter() {
        LinkedList linkedList = new LinkedList();
        for (Filter<ProcessingContext<Object>> filter : this.acceptingChannel.getProcessFilter()) {
            if (!(filter instanceof FilterAdapter)) {
                throw new IllegalStateException("Unexpected type of filter. Was the list of filter tampered with?");
            }
            linkedList.add(((FilterAdapter) filter).delegate);
        }
        return linkedList;
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void remove(Filter<Object> filter) {
        for (Filter<ProcessingContext<Object>> filter2 : this.acceptingChannel.getProcessFilter()) {
            if ((filter2 instanceof FilterAdapter) && ((FilterAdapter) filter2).delegate.equals(filter)) {
                this.acceptingChannel.removeProcessFilter(filter2);
            }
        }
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId onException(EventType eventType, MessageBusExceptionListener messageBusExceptionListener) {
        return this.exceptionListenerHandler.register(eventType, messageBusExceptionListener);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public SubscriptionId onException(CorrelationId correlationId, MessageBusExceptionListener messageBusExceptionListener) {
        return this.exceptionListenerHandler.register(correlationId, messageBusExceptionListener);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void unregisterExceptionListener(SubscriptionId subscriptionId) {
        this.exceptionListenerHandler.unregister(subscriptionId);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public MessageBusStatusInformation getStatusInformation() {
        return this.statusInformationAdapter;
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public void close(boolean z) {
        this.acceptingChannel.close(z);
    }

    @Override // de.quantummaid.messagemaid.internal.autoclosable.NoErrorAutoClosable, java.lang.AutoCloseable
    public void close() {
        close(true);
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public boolean awaitTermination(long j, TimeUnit timeUnit) {
        return true;
    }

    @Override // de.quantummaid.messagemaid.messageBus.MessageBus
    public boolean isClosed() {
        return true;
    }
}
