package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.commons.CommandExecutor;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.HandlerResolver;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;

/* loaded from: input_file:org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.class */
public class ApplicationCommandListener extends GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(ApplicationCommandListener.class.getName());
    private final MessageConverter messageConverter;
    private final HandlerResolver resolver;
    private final String directExchange;
    private final boolean withDLQRetry;
    private final boolean delayedCommands;
    private final int retryDelay;
    private final Optional<Integer> maxLengthBytes;

    public ApplicationCommandListener(ReactiveMessageListener reactiveMessageListener, String str, HandlerResolver handlerResolver, String str2, MessageConverter messageConverter, boolean z, boolean z2, boolean z3, long j, int i, Optional<Integer> optional, DiscardNotifier discardNotifier, CustomReporter customReporter) {
        super(str, reactiveMessageListener, z, z2, j, i, discardNotifier, "command", customReporter);
        this.retryDelay = i;
        this.withDLQRetry = z;
        this.delayedCommands = z3;
        this.resolver = handlerResolver;
        this.directExchange = str2;
        this.messageConverter = messageConverter;
        this.maxLengthBytes = optional;
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected Mono<Void> setUpBindings(TopologyCreator topologyCreator) {
        Mono<AMQP.Exchange.DeclareOk> declare = topologyCreator.declare(ExchangeSpecification.exchange(this.directExchange).durable(true).type("direct"));
        if (!this.withDLQRetry) {
            Mono<AMQP.Queue.DeclareOk> declareQueue = topologyCreator.declareQueue(this.queueName, this.maxLengthBytes);
            return declare.then(declareQueue).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange, this.queueName, this.queueName))).then(declareDelayedTopology(topologyCreator)).then();
        }
        Mono<AMQP.Exchange.DeclareOk> declare2 = topologyCreator.declare(ExchangeSpecification.exchange(this.directExchange + ".DLQ").durable(true).type("direct"));
        Mono<AMQP.Queue.DeclareOk> declareQueue2 = topologyCreator.declareQueue(this.queueName, this.directExchange + ".DLQ", this.maxLengthBytes);
        Mono<AMQP.Queue.DeclareOk> declareDLQ = topologyCreator.declareDLQ(this.queueName, this.directExchange, this.retryDelay, this.maxLengthBytes);
        return declare.then(declare2).then(declareDLQ).then(declareQueue2).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange + ".DLQ", this.queueName, this.queueName + ".DLQ"))).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange, this.queueName, this.queueName))).then(declareDelayedTopology(topologyCreator)).then();
    }

    private Mono<Void> declareDelayedTopology(TopologyCreator topologyCreator) {
        if (!this.delayedCommands) {
            return Mono.empty();
        }
        String str = this.queueName + "-delayed";
        return topologyCreator.declareQueue(str, this.directExchange, this.maxLengthBytes, Optional.of(this.queueName)).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange, str, str))).then();
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected Function<Message, Mono<Object>> rawMessageHandler(String str) {
        RegisteredCommandHandler commandHandler = this.resolver.getCommandHandler(str);
        Class inputClass = commandHandler.getInputClass();
        CommandExecutor commandExecutor = new CommandExecutor(commandHandler.getHandler(), message -> {
            return this.messageConverter.readCommand(message, inputClass);
        });
        return message2 -> {
            return commandExecutor.execute(message2).cast(Object.class);
        };
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected String getExecutorPath(AcknowledgableDelivery acknowledgableDelivery) {
        return this.messageConverter.readCommandStructure(RabbitMessage.fromDelivery(acknowledgableDelivery)).getName();
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected Object parseMessageForReporter(Message message) {
        return this.messageConverter.readCommandStructure(message);
    }
}
