package reactor.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:reactor/rabbitmq/Sender.class */
public class Sender implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private static final Function<Connection, Channel> CHANNEL_PROXY_CREATION_FUNCTION = new ChannelProxyCreationFunction();
    private final Mono<? extends Connection> connectionMono;
    private final Mono<? extends Channel> channelMono;
    private final BiConsumer<SignalType, Channel> channelCloseHandler;
    private final AtomicReference<Connection> connection;
    private final Mono<? extends Channel> resourceManagementChannelMono;
    private final Scheduler resourceManagementScheduler;
    private final boolean privateResourceManagementScheduler;
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;
    private final ExecutorService channelCloseThreadPool;
    private final int connectionClosingTimeout;
    private final AtomicBoolean closingOrClosed;
    private static final String REACTOR_RABBITMQ_DELIVERY_TAG_HEADER = "reactor_rabbitmq_delivery_tag";

    /* loaded from: input_file:reactor/rabbitmq/Sender$ChannelCreationFunction.class */
    private static class ChannelCreationFunction implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override // java.util.function.Function
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/Sender$ChannelProxyCreationFunction.class */
    private static class ChannelProxyCreationFunction implements Function<Connection, Channel> {
        private ChannelProxyCreationFunction() {
        }

        @Override // java.util.function.Function
        public Channel apply(Connection connection) {
            try {
                return new ChannelProxy(connection);
            } catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/Sender$ConfirmSendContext.class */
    public static class ConfirmSendContext<OMSG extends OutboundMessage> extends SendContext<OMSG> {
        private final PublishConfirmSubscriber<OMSG> subscriber;

        protected ConfirmSendContext(Channel channel, OMSG omsg, PublishConfirmSubscriber<OMSG> publishConfirmSubscriber) {
            super(channel, omsg);
            this.subscriber = publishConfirmSubscriber;
        }

        @Override // reactor.rabbitmq.Sender.SendContext
        public void publish(OutboundMessage outboundMessage) throws Exception {
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                ((PublishConfirmSubscriber) this.subscriber).unconfirmed.putIfAbsent(Long.valueOf(nextPublishSeqNo), this.message);
                this.channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), ((PublishConfirmSubscriber) this.subscriber).trackReturned, (AMQP.BasicProperties) ((PublishConfirmSubscriber) this.subscriber).propertiesProcessor.apply(this.message.getProperties(), Long.valueOf(nextPublishSeqNo)), outboundMessage.getBody());
            } catch (Exception e) {
                ((PublishConfirmSubscriber) this.subscriber).unconfirmed.remove(Long.valueOf(nextPublishSeqNo));
                throw e;
            }
        }

        @Override // reactor.rabbitmq.Sender.SendContext
        public void publish() throws Exception {
            publish(getMessage());
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/Sender$PublishConfirmOperator.class */
    private static class PublishConfirmOperator<OMSG extends OutboundMessage> extends FluxOperator<OMSG, OutboundMessageResult<OMSG>> {
        private final Channel channel;
        private final SendOptions options;

        public PublishConfirmOperator(Publisher<OMSG> publisher, Channel channel, SendOptions sendOptions) {
            super(Flux.from(publisher));
            this.channel = channel;
            this.options = sendOptions;
        }

        public void subscribe(CoreSubscriber<? super OutboundMessageResult<OMSG>> coreSubscriber) {
            this.source.subscribe(new PublishConfirmSubscriber(this.channel, coreSubscriber, this.options));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/rabbitmq/Sender$PublishConfirmSubscriber.class */
    public static class PublishConfirmSubscriber<OMSG extends OutboundMessage> implements CoreSubscriber<OMSG>, Subscription {
        private final AtomicReference<SubscriberState> state;
        private final AtomicReference<Throwable> firstException;
        private final ConcurrentNavigableMap<Long, OMSG> unconfirmed;
        private final Channel channel;
        private final Subscriber<? super OutboundMessageResult<OMSG>> subscriber;
        private final BiConsumer<SendContext, Exception> exceptionHandler;
        private Subscription subscription;
        private ConfirmListener confirmListener;
        private ReturnListener returnListener;
        private ShutdownListener shutdownListener;
        private final boolean trackReturned;
        private final BiFunction<AMQP.BasicProperties, Long, AMQP.BasicProperties> propertiesProcessor;

        private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult<OMSG>> subscriber, SendOptions sendOptions) {
            this.state = new AtomicReference<>(SubscriberState.INIT);
            this.firstException = new AtomicReference<>();
            this.unconfirmed = new ConcurrentSkipListMap();
            this.channel = channel;
            this.subscriber = subscriber;
            this.exceptionHandler = sendOptions.getExceptionHandler();
            this.trackReturned = sendOptions.isTrackReturned();
            if (this.trackReturned) {
                this.propertiesProcessor = (v0, v1) -> {
                    return addReactorRabbitMQDeliveryTag(v0, v1);
                };
            } else {
                this.propertiesProcessor = (basicProperties, l) -> {
                    return basicProperties;
                };
            }
        }

        public void request(long j) {
            this.subscription.request(j);
        }

        public void cancel() {
            this.subscription.cancel();
        }

        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.subscription, subscription)) {
                if (this.trackReturned) {
                    this.returnListener = (i, str, str2, str3, basicProperties, bArr) -> {
                        try {
                            Object obj = basicProperties.getHeaders().get(Sender.REACTOR_RABBITMQ_DELIVERY_TAG_HEADER);
                            if (obj instanceof Long) {
                                Long l = (Long) obj;
                                this.subscriber.onNext(new OutboundMessageResult((OutboundMessage) this.unconfirmed.get(l), true, true));
                                this.unconfirmed.remove(l);
                            } else {
                                handleError(new IllegalArgumentException("Missing header reactor_rabbitmq_delivery_tag"), null);
                            }
                        } catch (Exception e) {
                            handleError(e, null);
                        }
                    };
                    this.channel.addReturnListener(this.returnListener);
                }
                this.confirmListener = new ConfirmListener() { // from class: reactor.rabbitmq.Sender.PublishConfirmSubscriber.1
                    public void handleAck(long j, boolean z) {
                        handleAckNack(j, z, true);
                    }

                    public void handleNack(long j, boolean z) {
                        handleAckNack(j, z, false);
                    }

                    private void handleAckNack(long j, boolean z, boolean z2) {
                        if (z) {
                            try {
                                Iterator it = PublishConfirmSubscriber.this.unconfirmed.headMap((ConcurrentNavigableMap) Long.valueOf(j), true).entrySet().iterator();
                                while (it.hasNext()) {
                                    PublishConfirmSubscriber.this.subscriber.onNext(new OutboundMessageResult((OutboundMessage) ((Map.Entry) it.next()).getValue(), z2, false));
                                    it.remove();
                                }
                            } catch (Exception e) {
                                PublishConfirmSubscriber.this.handleError(e, null);
                            }
                        } else {
                            OutboundMessage outboundMessage = (OutboundMessage) PublishConfirmSubscriber.this.unconfirmed.get(Long.valueOf(j));
                            if (outboundMessage != null) {
                                try {
                                    PublishConfirmSubscriber.this.unconfirmed.remove(Long.valueOf(j));
                                    PublishConfirmSubscriber.this.subscriber.onNext(new OutboundMessageResult(outboundMessage, z2, false));
                                } catch (Exception e2) {
                                    PublishConfirmSubscriber.this.handleError(e2, new OutboundMessageResult(outboundMessage, z2, false));
                                }
                            }
                        }
                        if (PublishConfirmSubscriber.this.unconfirmed.isEmpty()) {
                            PublishConfirmSubscriber.this.maybeComplete();
                        }
                    }
                };
                this.channel.addConfirmListener(this.confirmListener);
                this.shutdownListener = shutdownSignalException -> {
                    Iterator it = this.unconfirmed.entrySet().iterator();
                    while (it.hasNext()) {
                        OutboundMessage outboundMessage = (OutboundMessage) ((Map.Entry) it.next()).getValue();
                        if (!outboundMessage.isPublished()) {
                            try {
                                this.subscriber.onNext(new OutboundMessageResult(outboundMessage, false, false));
                                it.remove();
                            } catch (Exception e) {
                                Sender.LOGGER.info("Error while nacking messages after channel failure");
                            }
                        }
                    }
                    if (shutdownSignalException.isHardError() || shutdownSignalException.isInitiatedByApplication()) {
                        return;
                    }
                    this.subscriber.onError(shutdownSignalException);
                };
                this.channel.addShutdownListener(this.shutdownListener);
                this.state.set(SubscriberState.ACTIVE);
                this.subscription = subscription;
                this.subscriber.onSubscribe(this);
            }
        }

        public void onNext(OMSG omsg) {
            if (checkComplete(omsg)) {
                return;
            }
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                this.unconfirmed.putIfAbsent(Long.valueOf(nextPublishSeqNo), omsg);
                this.channel.basicPublish(omsg.getExchange(), omsg.getRoutingKey(), this.trackReturned, this.propertiesProcessor.apply(omsg.getProperties(), Long.valueOf(nextPublishSeqNo)), omsg.getBody());
                omsg.published();
            } catch (Exception e) {
                this.unconfirmed.remove(Long.valueOf(nextPublishSeqNo));
                try {
                    this.exceptionHandler.accept(new ConfirmSendContext(this.channel, omsg, this), e);
                } catch (RabbitFluxRetryTimeoutException e2) {
                    this.subscriber.onNext(new OutboundMessageResult(omsg, false, false));
                } catch (Exception e3) {
                    handleError(e3, new OutboundMessageResult<>(omsg, false, false));
                }
            }
        }

        private static AMQP.BasicProperties addReactorRabbitMQDeliveryTag(AMQP.BasicProperties basicProperties, long j) {
            AMQP.BasicProperties basicProperties2 = basicProperties != null ? basicProperties : new AMQP.BasicProperties();
            HashMap hashMap = basicProperties2.getHeaders() != null ? new HashMap(basicProperties2.getHeaders()) : new HashMap();
            hashMap.putIfAbsent(Sender.REACTOR_RABBITMQ_DELIVERY_TAG_HEADER, Long.valueOf(j));
            return basicProperties2.builder().headers(hashMap).build();
        }

        public void onError(Throwable th) {
            if (!this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) && !this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                if (this.firstException.compareAndSet(null, th) && this.state.get() == SubscriberState.COMPLETE) {
                    Operators.onErrorDropped(th, currentContext());
                    return;
                }
                return;
            }
            this.channel.removeConfirmListener(this.confirmListener);
            this.channel.removeShutdownListener(this.shutdownListener);
            if (this.returnListener != null) {
                this.channel.removeReturnListener(this.returnListener);
            }
            this.subscriber.onError(th);
        }

        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.unconfirmed.size() == 0) {
                maybeComplete();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleError(Exception exc, @Nullable OutboundMessageResult<OMSG> outboundMessageResult) {
            Sender.LOGGER.error("error in publish confirm sending", exc);
            boolean checkComplete = checkComplete(exc);
            this.firstException.compareAndSet(null, exc);
            if (checkComplete) {
                return;
            }
            if (outboundMessageResult != null) {
                this.subscriber.onNext(outboundMessageResult);
            }
            onError(exc);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeComplete() {
            if (this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.channel.removeConfirmListener(this.confirmListener);
                this.channel.removeShutdownListener(this.shutdownListener);
                if (this.returnListener != null) {
                    this.channel.removeReturnListener(this.returnListener);
                }
                this.subscriber.onComplete();
            }
        }

        public <T> boolean checkComplete(T t) {
            boolean z = this.state.get() == SubscriberState.COMPLETE;
            if (z && this.firstException.get() == null) {
                Operators.onNextDropped(t, currentContext());
            }
            return z;
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/Sender$SendContext.class */
    public static class SendContext<OMSG extends OutboundMessage> {
        protected final Channel channel;
        protected final OMSG message;

        protected SendContext(Channel channel, OMSG omsg) {
            this.channel = channel;
            this.message = omsg;
        }

        public OMSG getMessage() {
            return this.message;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public void publish(OutboundMessage outboundMessage) throws Exception {
            this.channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody());
        }

        public void publish() throws Exception {
            publish(getMessage());
        }
    }

    public Sender() {
        this(new SenderOptions());
    }

    public Sender(SenderOptions senderOptions) {
        this.connection = new AtomicReference<>();
        this.channelCloseThreadPool = Executors.newCachedThreadPool();
        this.closingOrClosed = new AtomicBoolean(false);
        this.privateConnectionSubscriptionScheduler = senderOptions.getConnectionSubscriptionScheduler() == null;
        this.connectionSubscriptionScheduler = senderOptions.getConnectionSubscriptionScheduler() == null ? createScheduler("rabbitmq-sender-connection-subscription") : senderOptions.getConnectionSubscriptionScheduler();
        this.connectionMono = senderOptions.getConnectionMono() == null ? senderOptions.getConnectionMonoConfigurator().apply(Mono.fromCallable(() -> {
            return senderOptions.getConnectionSupplier() == null ? senderOptions.getConnectionFactory().newConnection() : senderOptions.getConnectionSupplier().apply(null);
        })).doOnNext(connection -> {
            this.connection.set(connection);
        }).subscribeOn(this.connectionSubscriptionScheduler).transform(this::cache) : senderOptions.getConnectionMono();
        this.channelMono = senderOptions.getChannelMono();
        this.channelCloseHandler = senderOptions.getChannelCloseHandler() == null ? ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE : senderOptions.getChannelCloseHandler();
        this.privateResourceManagementScheduler = senderOptions.getResourceManagementScheduler() == null;
        this.resourceManagementScheduler = senderOptions.getResourceManagementScheduler() == null ? createScheduler("rabbitmq-sender-resource-creation") : senderOptions.getResourceManagementScheduler();
        this.resourceManagementChannelMono = senderOptions.getResourceManagementChannelMono() == null ? this.connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).transform(this::cache) : senderOptions.getResourceManagementChannelMono();
        if (senderOptions.getConnectionClosingTimeout() == null || Duration.ZERO.equals(senderOptions.getConnectionClosingTimeout())) {
            this.connectionClosingTimeout = -1;
        } else {
            this.connectionClosingTimeout = (int) senderOptions.getConnectionClosingTimeout().toMillis();
        }
    }

    protected Scheduler createScheduler(String str) {
        return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, str);
    }

    protected <T> Mono<T> cache(Mono<T> mono) {
        return Utils.cache(mono);
    }

    public Mono<Void> send(Publisher<OutboundMessage> publisher) {
        return send(publisher, new SendOptions());
    }

    public Mono<Void> send(Publisher<OutboundMessage> publisher, @Nullable SendOptions sendOptions) {
        SendOptions sendOptions2 = sendOptions == null ? new SendOptions() : sendOptions;
        Mono<? extends Channel> channelMono = getChannelMono(sendOptions2);
        BiConsumer<SendContext, Exception> exceptionHandler = sendOptions2.getExceptionHandler();
        BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(sendOptions2);
        return channelMono.flatMapMany(channel -> {
            return Flux.from(publisher).doOnNext(outboundMessage -> {
                try {
                    channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody());
                } catch (Exception e) {
                    exceptionHandler.accept(new SendContext(channel, outboundMessage), e);
                }
            }).doOnError(th -> {
                LOGGER.warn("Send failed with exception {}", th);
            }).doFinally(signalType -> {
                channelCloseHandler.accept(signalType, channel);
            });
        }).then();
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> publisher) {
        return sendWithPublishConfirms(publisher, new SendOptions());
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> publisher, SendOptions sendOptions) {
        return Flux.from(sendWithTypedPublishConfirms(publisher, sendOptions));
    }

    public <OMSG extends OutboundMessage> Flux<OutboundMessageResult<OMSG>> sendWithTypedPublishConfirms(Publisher<OMSG> publisher) {
        return sendWithTypedPublishConfirms(publisher, new SendOptions());
    }

    public <OMSG extends OutboundMessage> Flux<OutboundMessageResult<OMSG>> sendWithTypedPublishConfirms(Publisher<OMSG> publisher, @Nullable SendOptions sendOptions) {
        SendOptions sendOptions2 = sendOptions == null ? new SendOptions() : sendOptions;
        Mono<? extends Channel> channelMono = getChannelMono(sendOptions);
        BiConsumer<SignalType, Channel> channelCloseHandler = getChannelCloseHandler(sendOptions);
        Flux<OutboundMessageResult<OMSG>> flatMapMany = channelMono.map(channel -> {
            try {
                channel.confirmSelect();
                return channel;
            } catch (IOException e) {
                throw new RabbitFluxException("Error while setting publisher confirms on channel", e);
            }
        }).flatMapMany(channel2 -> {
            return new PublishConfirmOperator(publisher, channel2, sendOptions2).doFinally(signalType -> {
                if (signalType == SignalType.ON_ERROR) {
                    channelCloseHandler.accept(signalType, channel2);
                } else {
                    this.channelCloseThreadPool.execute(() -> {
                        channelCloseHandler.accept(signalType, channel2);
                    });
                }
            });
        });
        if (sendOptions2.getMaxInFlight() != null) {
            flatMapMany = flatMapMany.publishOn(sendOptions2.getScheduler(), sendOptions2.getMaxInFlight().intValue());
        }
        return flatMapMany;
    }

    Mono<? extends Channel> getChannelMono(SendOptions sendOptions) {
        return (Mono) Stream.of((Object[]) new Mono[]{sendOptions.getChannelMono(), this.channelMono}).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst().orElse(this.connectionMono.map(CHANNEL_CREATION_FUNCTION));
    }

    private BiConsumer<SignalType, Channel> getChannelCloseHandler(SendOptions sendOptions) {
        return sendOptions.getChannelCloseHandler() != null ? sendOptions.getChannelCloseHandler() : this.channelCloseHandler;
    }

    public RpcClient rpcClient(String str, String str2) {
        return new RpcClient(this.connectionMono.map(CHANNEL_CREATION_FUNCTION).transform(this::cache), str, str2);
    }

    public RpcClient rpcClient(String str, String str2, Supplier<String> supplier) {
        return new RpcClient(this.connectionMono.map(CHANNEL_CREATION_FUNCTION).transform(this::cache), str, str2, supplier);
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queueSpecification) {
        return declareQueue(queueSpecification, null);
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queueSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return declareQueue(queueSpecification, resourceManagementOptions);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification queueSpecification) {
        return declareQueue(queueSpecification, null);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification queueSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Queue.Declare build = queueSpecification.getName() == null ? new AMQP.Queue.Declare.Builder().queue("").durable(false).exclusive(true).autoDelete(true).arguments(queueSpecification.getArguments()).build() : new AMQP.Queue.Declare.Builder().queue(queueSpecification.getName()).durable(queueSpecification.isDurable()).exclusive(queueSpecification.isExclusive()).autoDelete(queueSpecification.isAutoDelete()).passive(queueSpecification.isPassive()).arguments(queueSpecification.getArguments()).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    private Mono<? extends Channel> getChannelMonoForResourceManagement(ResourceManagementOptions resourceManagementOptions) {
        return (resourceManagementOptions == null || resourceManagementOptions.getChannelMono() == null) ? this.resourceManagementChannelMono : resourceManagementOptions.getChannelMono();
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification queueSpecification) {
        return delete(queueSpecification, false, false);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification queueSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return delete(queueSpecification, false, false, resourceManagementOptions);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification queueSpecification, boolean z, boolean z2) {
        return deleteQueue(queueSpecification, z, z2);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification queueSpecification, boolean z, boolean z2, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return deleteQueue(queueSpecification, z, z2, resourceManagementOptions);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification queueSpecification, boolean z, boolean z2) {
        return deleteQueue(queueSpecification, z, z2, null);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification queueSpecification, boolean z, boolean z2, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Queue.Delete build = new AMQP.Queue.Delete.Builder().queue(queueSpecification.getName()).ifUnused(z).ifEmpty(z2).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchangeSpecification) {
        return declareExchange(exchangeSpecification, null);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchangeSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return declareExchange(exchangeSpecification, resourceManagementOptions);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification exchangeSpecification) {
        return declareExchange(exchangeSpecification, null);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification exchangeSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Exchange.Declare build = new AMQP.Exchange.Declare.Builder().exchange(exchangeSpecification.getName()).type(exchangeSpecification.getType()).durable(exchangeSpecification.isDurable()).autoDelete(exchangeSpecification.isAutoDelete()).internal(exchangeSpecification.isInternal()).passive(exchangeSpecification.isPassive()).arguments(exchangeSpecification.getArguments()).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification exchangeSpecification) {
        return delete(exchangeSpecification, false);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification exchangeSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return delete(exchangeSpecification, false, resourceManagementOptions);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification exchangeSpecification, boolean z) {
        return deleteExchange(exchangeSpecification, z);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification exchangeSpecification, boolean z, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return deleteExchange(exchangeSpecification, z, resourceManagementOptions);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification exchangeSpecification, boolean z) {
        return deleteExchange(exchangeSpecification, z, null);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification exchangeSpecification, boolean z, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Exchange.Delete build = new AMQP.Exchange.Delete.Builder().exchange(exchangeSpecification.getName()).ifUnused(z).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Queue.UnbindOk> unbindQueue(BindingSpecification bindingSpecification) {
        return unbind(bindingSpecification);
    }

    public Mono<AMQP.Queue.UnbindOk> unbindQueue(BindingSpecification bindingSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return unbind(bindingSpecification, resourceManagementOptions);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification bindingSpecification) {
        return unbind(bindingSpecification, null);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification bindingSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Queue.Unbind build = new AMQP.Queue.Unbind.Builder().exchange(bindingSpecification.getExchange()).queue(bindingSpecification.getQueue()).routingKey(bindingSpecification.getRoutingKey()).arguments(bindingSpecification.getArguments()).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.UnbindOk> unbindExchange(BindingSpecification bindingSpecification) {
        return unbindExchange(bindingSpecification, null);
    }

    public Mono<AMQP.Exchange.UnbindOk> unbindExchange(BindingSpecification bindingSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Exchange.Unbind build = new AMQP.Exchange.Unbind.Builder().source(bindingSpecification.getExchange()).destination(bindingSpecification.getQueue()).routingKey(bindingSpecification.getRoutingKey()).arguments(bindingSpecification.getArguments()).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Queue.BindOk> bindQueue(BindingSpecification bindingSpecification) {
        return bind(bindingSpecification);
    }

    public Mono<AMQP.Queue.BindOk> bindQueue(BindingSpecification bindingSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        return bind(bindingSpecification, resourceManagementOptions);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification bindingSpecification) {
        return bind(bindingSpecification, null);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification bindingSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Queue.Bind build = new AMQP.Queue.Bind.Builder().exchange(bindingSpecification.getExchange()).queue(bindingSpecification.getQueue()).routingKey(bindingSpecification.getRoutingKey()).arguments(bindingSpecification.getArguments()).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.BindOk> bindExchange(BindingSpecification bindingSpecification) {
        return bindExchange(bindingSpecification, null);
    }

    public Mono<AMQP.Exchange.BindOk> bindExchange(BindingSpecification bindingSpecification, @Nullable ResourceManagementOptions resourceManagementOptions) {
        Mono<? extends Channel> channelMonoForResourceManagement = getChannelMonoForResourceManagement(resourceManagementOptions);
        AMQP.Exchange.Bind build = new AMQP.Exchange.Bind.Builder().source(bindingSpecification.getExchange()).destination(bindingSpecification.getExchangeTo()).routingKey(bindingSpecification.getRoutingKey()).arguments(bindingSpecification.getArguments()).build();
        return channelMonoForResourceManagement.map(channel -> {
            try {
                return channel.asyncCompletableRpc(build);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(completableFuture -> {
            return Mono.fromCompletionStage(completableFuture);
        }).flatMap(command -> {
            return Mono.just(command.getMethod());
        }).publishOn(this.resourceManagementScheduler);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closingOrClosed.compareAndSet(false, true)) {
            if (this.connection.get() != null) {
                Helpers.safelyExecute(LOGGER, () -> {
                    this.connection.get().close(this.connectionClosingTimeout);
                }, "Error while closing sender connection");
            }
            if (this.privateConnectionSubscriptionScheduler) {
                Helpers.safelyExecute(LOGGER, () -> {
                    this.connectionSubscriptionScheduler.dispose();
                }, "Error while disposing connection subscription scheduler");
            }
            if (this.privateResourceManagementScheduler) {
                Helpers.safelyExecute(LOGGER, () -> {
                    this.resourceManagementScheduler.dispose();
                }, "Error while disposing resource management scheduler");
            }
            Helpers.safelyExecute(LOGGER, () -> {
                this.channelCloseThreadPool.shutdown();
            }, "Error while closing channel closing thread pool");
        }
    }
}
