package de.digitalcollections.workflow.engine.messagebroker;

import de.digitalcollections.workflow.engine.model.Envelope;
import de.digitalcollections.workflow.engine.model.Message;
import de.digitalcollections.workflow.engine.util.Maps;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/digitalcollections/workflow/engine/messagebroker/MessageBroker.class */
public class MessageBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class);
    private static final String MESSAGE_TTL = "x-message-ttl";
    private static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
    private MessageBrokerConfig config;
    private RoutingConfig routingConfig;
    private final RabbitClient rabbitClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageBroker(MessageBrokerConfig messageBrokerConfig, RoutingConfig routingConfig, RabbitClient rabbitClient) throws IOException {
        this.config = messageBrokerConfig;
        this.routingConfig = routingConfig;
        this.rabbitClient = rabbitClient;
        provideExchanges();
        provideInputQueues();
        if (routingConfig.hasWriteTo()) {
            provideOutputQueue();
        }
    }

    public void send(Message message) throws IOException {
        this.rabbitClient.send(this.routingConfig.getExchange(), this.routingConfig.getWriteTo(), message);
    }

    public void send(String str, Message message) throws IOException {
        this.rabbitClient.send(this.routingConfig.getExchange(), str, message);
    }

    public void send(String str, Collection<Message> collection) throws IOException {
        Iterator<Message> it = collection.iterator();
        while (it.hasNext()) {
            send(str, it.next());
        }
    }

    public Message receive(String str) throws IOException {
        return this.rabbitClient.receive(str);
    }

    public Message receive() throws IOException {
        Message message = null;
        for (String str : this.routingConfig.getReadFrom()) {
            message = receive(str);
            if (message != null) {
                break;
            }
        }
        return message;
    }

    private void provideInputQueues() throws IOException {
        String deadLetterExchange = this.routingConfig.getDeadLetterExchange();
        String exchange = this.routingConfig.getExchange();
        for (String str : this.routingConfig.getReadFrom()) {
            FailurePolicy failurePolicy = this.routingConfig.getFailurePolicy(str);
            this.rabbitClient.declareQueue(str, exchange, str, null);
            if (failurePolicy.getRetryRoutingKey() != null) {
                this.rabbitClient.declareQueue(failurePolicy.getRetryRoutingKey(), deadLetterExchange, str, Maps.of(MESSAGE_TTL, Integer.valueOf(this.config.getDeadLetterWait()), DEAD_LETTER_EXCHANGE, exchange));
            }
            if (failurePolicy.getFailedRoutingKey() != null) {
                this.rabbitClient.declareQueue(failurePolicy.getFailedRoutingKey(), exchange, failurePolicy.getFailedRoutingKey(), null);
            }
        }
    }

    private void provideOutputQueue() throws IOException {
        this.rabbitClient.declareQueue(this.routingConfig.getWriteTo(), this.routingConfig.getExchange(), this.routingConfig.getWriteTo(), Maps.of(DEAD_LETTER_EXCHANGE, this.routingConfig.getDeadLetterExchange()));
    }

    public void ack(Message message) throws IOException {
        this.rabbitClient.ack(message);
    }

    public void reject(Message message) throws IOException {
        Envelope envelope = message.getEnvelope();
        ack(message);
        if (envelope.getRetries() >= this.config.getMaxRetries()) {
            fail(message);
        } else {
            envelope.setRetries(envelope.getRetries() + 1);
            retry(message);
        }
    }

    private void fail(Message message) throws IOException {
        LOGGER.debug("Send message to failed queue: " + message);
        String failedRoutingKey = this.routingConfig.getFailurePolicy(message).getFailedRoutingKey();
        if (failedRoutingKey != null) {
            send(failedRoutingKey, message);
        }
    }

    private void retry(Message message) throws IOException {
        LOGGER.debug("Send message to retry queue: " + message);
        if (this.routingConfig.getFailurePolicy(message).getRetryRoutingKey() != null) {
            this.rabbitClient.send(this.routingConfig.getDeadLetterExchange(), message.getEnvelope().getSource(), message);
        }
    }

    private void provideExchanges() throws IOException {
        this.rabbitClient.provideExchange(this.routingConfig.getExchange());
        this.rabbitClient.provideExchange(this.routingConfig.getDeadLetterExchange());
    }

    public MessageBrokerConfig getConfig() {
        return this.config;
    }
}
