package org.springframework.amqp.rabbit.core;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.Return;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReceiveAndReplyCallback;
import org.springframework.amqp.core.ReceiveAndReplyMessageCallback;
import org.springframework.amqp.core.ReplyToAddressCallback;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.ClosingRecoveryListener;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.PendingConfirm;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.ThreadChannelConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitOperations;
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.Delivery;
import org.springframework.amqp.rabbit.support.ListenerContainerAware;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageSenderContext;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservationConvention;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.lang.Nullable;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.10.jar:org/springframework/amqp/rabbit/core/RabbitTemplate.class */
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener, ApplicationContextAware, ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    private static final String UNCHECKED = "unchecked";
    private static final String RETURN_CORRELATION_KEY = "spring_request_return_correlation";
    private static final String DEFAULT_EXCHANGE = "";
    private static final String DEFAULT_ROUTING_KEY = "";
    private static final long DEFAULT_REPLY_TIMEOUT = 5000;
    private static final long DEFAULT_CONSUME_TIMEOUT = 10000;
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();
    private ApplicationContext applicationContext;
    private String defaultReceiveQueue;
    private String replyAddress;

    @Nullable
    private ConfirmCallback confirmCallback;
    private ReturnsCallback returnsCallback;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private Expression sendConnectionFactorySelectorExpression;
    private Expression receiveConnectionFactorySelectorExpression;
    private boolean useTemporaryReplyQueues;
    private Collection<MessagePostProcessor> beforePublishPostProcessors;
    private Collection<MessagePostProcessor> afterReceivePostProcessors;
    private CorrelationDataPostProcessor correlationDataPostProcessor;
    private Expression userIdExpression;
    private Executor taskExecutor;
    private boolean userCorrelationId;
    private boolean usePublisherConnection;
    private boolean noLocalReplyConsumer;
    private ErrorHandler replyErrorHandler;
    private boolean useChannelForCorrelation;
    private boolean observationEnabled;

    @Nullable
    private RabbitTemplateObservationConvention observationConvention;
    private volatile boolean usingFastReplyTo;
    private volatile boolean evaluatedFastReplyTo;
    private volatile boolean isListener;
    private volatile boolean observationRegistryObtained;
    private final ThreadLocal<Channel> dedicatedChannels = new ThreadLocal<>();
    private final AtomicInteger activeTemplateCallbacks = new AtomicInteger();
    private final ConcurrentMap<Channel, RabbitTemplate> publisherConfirmChannels = new ConcurrentHashMap();
    private final Map<Object, PendingReply> replyHolder = new ConcurrentHashMap();
    private final String uuid = UUID.randomUUID().toString();
    private final AtomicInteger messageTagProvider = new AtomicInteger();
    private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
    private final ReplyToAddressCallback<?> defaultReplyToAddressCallback = (message, obj) -> {
        return getReplyToAddress(message);
    };
    private final Map<ConnectionFactory, DirectReplyToMessageListenerContainer> directReplyToContainers = new HashMap();
    private final AtomicInteger containerInstance = new AtomicInteger();
    private final Map<String, Object> consumerArgs = new HashMap();
    private String exchange = "";
    private String routingKey = "";
    private long receiveTimeout = 0;
    private long replyTimeout = 5000;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    private String encoding = "UTF-8";
    private Expression mandatoryExpression = new ValueExpression(false);
    private String correlationKey = null;
    private boolean useDirectReplyToContainer = true;
    private String beanName = "rabbitTemplate";

    @FunctionalInterface
    /* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.10.jar:org/springframework/amqp/rabbit/core/RabbitTemplate$ConfirmCallback.class */
    public interface ConfirmCallback {
        void confirm(@Nullable CorrelationData correlationData, boolean z, @Nullable String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.10.jar:org/springframework/amqp/rabbit/core/RabbitTemplate$PendingReply.class */
    public static class PendingReply {

        @Nullable
        private volatile String savedReplyTo;

        @Nullable
        private volatile String savedCorrelation;
        private final CompletableFuture<Message> future = new CompletableFuture<>();

        private PendingReply() {
        }

        @Nullable
        public String getSavedReplyTo() {
            return this.savedReplyTo;
        }

        public void setSavedReplyTo(@Nullable String str) {
            this.savedReplyTo = str;
        }

        @Nullable
        public String getSavedCorrelation() {
            return this.savedCorrelation;
        }

        public void setSavedCorrelation(@Nullable String str) {
            this.savedCorrelation = str;
        }

        public Message get() throws InterruptedException {
            try {
                return this.future.get();
            } catch (ExecutionException e) {
                throw RabbitExceptionTranslator.convertRabbitAccessException(e.getCause());
            }
        }

        @Nullable
        public Message get(long j, TimeUnit timeUnit) throws InterruptedException {
            try {
                return this.future.get(j, timeUnit);
            } catch (ExecutionException e) {
                throw RabbitExceptionTranslator.convertRabbitAccessException(e.getCause());
            } catch (TimeoutException e2) {
                return null;
            }
        }

        public void reply(Message message) {
            this.future.complete(message);
        }

        public void returned(AmqpMessageReturnedException amqpMessageReturnedException) {
            completeExceptionally(amqpMessageReturnedException);
        }

        public void completeExceptionally(Throwable th) {
            this.future.completeExceptionally(th);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.10.jar:org/springframework/amqp/rabbit/core/RabbitTemplate$ReturnsCallback.class */
    public interface ReturnsCallback {
        void returnedMessage(ReturnedMessage returnedMessage);
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-rabbit-3.0.10.jar:org/springframework/amqp/rabbit/core/RabbitTemplate$TemplateConsumer.class */
    protected static abstract class TemplateConsumer extends DefaultConsumer {
        public TemplateConsumer(Channel channel) {
            super(channel);
        }

        public String toString() {
            return "TemplateConsumer [channel=" + getChannel() + ", consumerTag=" + getConsumerTag() + "]";
        }
    }

    public RabbitTemplate() {
        initDefaultStrategies();
    }

    public RabbitTemplate(ConnectionFactory connectionFactory) {
        setConnectionFactory(connectionFactory);
    }

    protected void initDefaultStrategies() {
        setMessageConverter(new SimpleMessageConverter());
    }

    @Override // org.springframework.amqp.rabbit.connection.RabbitAccessor
    public final void setConnectionFactory(ConnectionFactory connectionFactory) {
        super.setConnectionFactory(connectionFactory);
        if (connectionFactory instanceof ThreadChannelConnectionFactory) {
            this.usePublisherConnection = true;
        }
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setObservationEnabled(boolean z) {
        this.observationEnabled = z;
    }

    public void setObservationConvention(RabbitTemplateObservationConvention rabbitTemplateObservationConvention) {
        this.observationConvention = rabbitTemplateObservationConvention;
    }

    public void setExchange(@Nullable String str) {
        this.exchange = str != null ? str : "";
    }

    public String getExchange() {
        return this.exchange;
    }

    public void setRoutingKey(String str) {
        this.routingKey = str;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public void setDefaultReceiveQueue(String str) {
        this.defaultReceiveQueue = str;
    }

    @Nullable
    public String getDefaultReceiveQueue() {
        return this.defaultReceiveQueue;
    }

    public void setEncoding(String str) {
        this.encoding = str;
    }

    public String getEncoding() {
        return this.encoding;
    }

    public synchronized void setReplyAddress(String str) {
        this.replyAddress = str;
        this.evaluatedFastReplyTo = false;
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    public void setReplyTimeout(long j) {
        this.replyTimeout = j;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) {
        Assert.notNull(messagePropertiesConverter, "messagePropertiesConverter must not be null");
        this.messagePropertiesConverter = messagePropertiesConverter;
    }

    protected MessagePropertiesConverter getMessagePropertiesConverter() {
        return this.messagePropertiesConverter;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setConfirmCallback(ConfirmCallback confirmCallback) {
        Assert.state(this.confirmCallback == null || this.confirmCallback.equals(confirmCallback), "Only one ConfirmCallback is supported by each RabbitTemplate");
        this.confirmCallback = confirmCallback;
    }

    public void setReturnsCallback(ReturnsCallback returnsCallback) {
        Assert.state(this.returnsCallback == null || this.returnsCallback.equals(returnsCallback), "Only one ReturnCallback is supported by each RabbitTemplate");
        this.returnsCallback = returnsCallback;
    }

    public void setMandatory(boolean z) {
        this.mandatoryExpression = new ValueExpression(Boolean.valueOf(z));
    }

    public void setMandatoryExpression(Expression expression) {
        Assert.notNull(expression, "'mandatoryExpression' must not be null");
        this.mandatoryExpression = expression;
    }

    public void setMandatoryExpressionString(String str) {
        Assert.notNull(str, "'mandatoryExpression' must not be null");
        this.mandatoryExpression = PARSER.parseExpression(str);
    }

    public void setSendConnectionFactorySelectorExpression(Expression expression) {
        this.sendConnectionFactorySelectorExpression = expression;
    }

    public void setReceiveConnectionFactorySelectorExpression(Expression expression) {
        this.receiveConnectionFactorySelectorExpression = expression;
    }

    public void setCorrelationKey(String str) {
        Assert.hasText(str, "'correlationKey' must not be null or empty");
        if (str.trim().equals(IntegrationMessageHeaderAccessor.CORRELATION_ID)) {
            return;
        }
        this.correlationKey = str.trim();
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    @Override // org.springframework.beans.factory.BeanFactoryAware
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory));
        this.evaluationContext.addPropertyAccessor(new MapAccessor());
    }

    public void setBeforePublishPostProcessors(MessagePostProcessor... messagePostProcessorArr) {
        Assert.notNull(messagePostProcessorArr, "'beforePublishPostProcessors' cannot be null");
        Assert.noNullElements(messagePostProcessorArr, "'beforePublishPostProcessors' cannot have null elements");
        this.beforePublishPostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(messagePostProcessorArr));
    }

    public void addBeforePublishPostProcessors(MessagePostProcessor... messagePostProcessorArr) {
        Assert.notNull(messagePostProcessorArr, "'beforePublishPostProcessors' cannot be null");
        if (this.beforePublishPostProcessors == null) {
            this.beforePublishPostProcessors = new ArrayList();
        }
        this.beforePublishPostProcessors.addAll(Arrays.asList(messagePostProcessorArr));
        this.beforePublishPostProcessors = MessagePostProcessorUtils.sort(this.beforePublishPostProcessors);
    }

    public boolean removeBeforePublishPostProcessor(MessagePostProcessor messagePostProcessor) {
        Assert.notNull(messagePostProcessor, "'beforePublishPostProcessor' cannot be null");
        if (this.beforePublishPostProcessors != null) {
            return this.beforePublishPostProcessors.remove(messagePostProcessor);
        }
        return false;
    }

    public void setAfterReceivePostProcessors(MessagePostProcessor... messagePostProcessorArr) {
        Assert.notNull(messagePostProcessorArr, "'afterReceivePostProcessors' cannot be null");
        Assert.noNullElements(messagePostProcessorArr, "'afterReceivePostProcessors' cannot have null elements");
        this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(messagePostProcessorArr));
    }

    @Nullable
    public Collection<MessagePostProcessor> getAfterReceivePostProcessors() {
        if (this.afterReceivePostProcessors != null) {
            return Collections.unmodifiableCollection(this.afterReceivePostProcessors);
        }
        return null;
    }

    public void addAfterReceivePostProcessors(MessagePostProcessor... messagePostProcessorArr) {
        Assert.notNull(messagePostProcessorArr, "'afterReceivePostProcessors' cannot be null");
        if (this.afterReceivePostProcessors == null) {
            this.afterReceivePostProcessors = new ArrayList();
        }
        this.afterReceivePostProcessors.addAll(Arrays.asList(messagePostProcessorArr));
        this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(this.afterReceivePostProcessors);
    }

    public boolean removeAfterReceivePostProcessor(MessagePostProcessor messagePostProcessor) {
        Assert.notNull(messagePostProcessor, "'afterReceivePostProcessor' cannot be null");
        if (this.afterReceivePostProcessors != null) {
            return this.afterReceivePostProcessors.remove(messagePostProcessor);
        }
        return false;
    }

    public void setCorrelationDataPostProcessor(CorrelationDataPostProcessor correlationDataPostProcessor) {
        this.correlationDataPostProcessor = correlationDataPostProcessor;
    }

    public void setUseTemporaryReplyQueues(boolean z) {
        this.useTemporaryReplyQueues = z;
    }

    public void setUseDirectReplyToContainer(boolean z) {
        this.useDirectReplyToContainer = z;
    }

    public void setUserIdExpression(Expression expression) {
        this.userIdExpression = expression;
    }

    public void setUserIdExpressionString(String str) {
        this.userIdExpression = PARSER.parseExpression(str);
    }

    @Override // org.springframework.beans.factory.BeanNameAware
    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setUserCorrelationId(boolean z) {
        this.userCorrelationId = z;
    }

    public boolean isUsePublisherConnection() {
        return this.usePublisherConnection;
    }

    public void setUsePublisherConnection(boolean z) {
        this.usePublisherConnection = z;
    }

    public void setNoLocalReplyConsumer(boolean z) {
        this.noLocalReplyConsumer = z;
    }

    public void setReplyErrorHandler(ErrorHandler errorHandler) {
        this.replyErrorHandler = errorHandler;
    }

    public void setUseChannelForCorrelation(boolean z) {
        this.useChannelForCorrelation = z;
    }

    @Override // org.springframework.amqp.rabbit.support.ListenerContainerAware
    @Nullable
    public Collection<String> expectedQueueNames() {
        this.isListener = true;
        List list = null;
        if (this.replyAddress == null || this.replyAddress.equals(Address.AMQ_RABBITMQ_REPLY_TO)) {
            throw new IllegalStateException("A listener container must not be provided when using direct reply-to");
        }
        Address address = new Address(this.replyAddress);
        if ("".equals(address.getExchangeName())) {
            list = Collections.singletonList(address.getRoutingKey());
        } else if (this.logger.isInfoEnabled()) {
            this.logger.info("Cannot verify reply queue because 'replyAddress' is not a simple queue name: " + this.replyAddress);
        }
        return list;
    }

    @Nullable
    public Collection<CorrelationData> getUnconfirmed(long j) {
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis() - j;
        Iterator<Channel> it = this.publisherConfirmChannels.keySet().iterator();
        while (it.hasNext()) {
            Iterator<PendingConfirm> it2 = ((PublisherCallbackChannel) it.next()).expire(this, currentTimeMillis).iterator();
            while (it2.hasNext()) {
                hashSet.add(it2.next().getCorrelationData());
            }
        }
        if (hashSet.size() > 0) {
            return hashSet;
        }
        return null;
    }

    public int getUnconfirmedCount() {
        return this.publisherConfirmChannels.keySet().stream().mapToInt(channel -> {
            return ((PublisherCallbackChannel) channel).getPendingConfirmsCount(this);
        }).sum();
    }

    public void addConsumerArg(String str, Object obj) {
        this.consumerArgs.put(str, obj);
    }

    public Object removeConsumerArg(String str) {
        return this.consumerArgs.remove(str);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations, org.springframework.context.Lifecycle
    public void start() {
        doStart();
    }

    protected void doStart() {
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations, org.springframework.context.Lifecycle
    public void stop() {
        synchronized (this.directReplyToContainers) {
            this.directReplyToContainers.values().stream().filter((v0) -> {
                return v0.isRunning();
            }).forEach((v0) -> {
                v0.stop();
            });
            this.directReplyToContainers.clear();
        }
        doStop();
    }

    protected void doStop() {
    }

    public boolean isRunning() {
        boolean anyMatch;
        synchronized (this.directReplyToContainers) {
            anyMatch = this.directReplyToContainers.values().stream().anyMatch((v0) -> {
                return v0.isRunning();
            });
        }
        return anyMatch;
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        stop();
    }

    private void evaluateFastReplyTo() {
        this.usingFastReplyTo = useDirectReplyTo();
        this.evaluatedFastReplyTo = true;
    }

    protected boolean useDirectReplyTo() {
        if (this.useTemporaryReplyQueues) {
            if (this.replyAddress == null) {
                return false;
            }
            this.logger.warn("'useTemporaryReplyQueues' is ignored when a 'replyAddress' is provided");
        }
        if (this.replyAddress != null && !Address.AMQ_RABBITMQ_REPLY_TO.equals(this.replyAddress)) {
            return false;
        }
        try {
            return ((Boolean) execute(channel -> {
                channel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO);
                return true;
            })).booleanValue();
        } catch (AmqpConnectException | AmqpIOException e) {
            if (shouldRethrow(e)) {
                throw e;
            }
            return false;
        }
    }

    private boolean shouldRethrow(AmqpException amqpException) {
        Throwable th;
        Throwable th2 = amqpException;
        while (true) {
            th = th2;
            if (th == null || (th instanceof ShutdownSignalException)) {
                break;
            }
            th2 = th.getCause();
        }
        if (th == null || !RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) th)) {
            if (!this.logger.isDebugEnabled()) {
                return true;
            }
            this.logger.debug("IO error, deferring directReplyTo detection: " + amqpException.toString());
            return true;
        }
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary queues will be used: " + th.getMessage() + ".");
        }
        this.replyAddress = null;
        return false;
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void send(Message message) throws AmqpException {
        send(this.exchange, this.routingKey, message);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void send(String str, Message message) throws AmqpException {
        send(this.exchange, str, message);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void send(String str, Message message, CorrelationData correlationData) throws AmqpException {
        send(this.exchange, str, message, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void send(String str, String str2, Message message) throws AmqpException {
        send(str, str2, message, null);
    }

    public void send(String str, String str2, Message message, @Nullable CorrelationData correlationData) throws AmqpException {
        execute(channel -> {
            doSend(channel, str, str2, message, (this.returnsCallback != null || (correlationData != null && StringUtils.hasText(correlationData.getId()))) && isMandatoryFor(message).booleanValue(), correlationData);
            return null;
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    private ConnectionFactory obtainTargetConnectionFactory(Expression expression, @Nullable Object obj) {
        if (expression != null) {
            ConnectionFactory connectionFactory = getConnectionFactory();
            if (connectionFactory instanceof AbstractRoutingConnectionFactory) {
                AbstractRoutingConnectionFactory abstractRoutingConnectionFactory = (AbstractRoutingConnectionFactory) connectionFactory;
                Object value = obj != null ? expression.getValue(this.evaluationContext, obj) : expression.getValue((EvaluationContext) this.evaluationContext);
                if (value != null) {
                    ConnectionFactory targetConnectionFactory = abstractRoutingConnectionFactory.getTargetConnectionFactory(value);
                    if (targetConnectionFactory != null) {
                        return targetConnectionFactory;
                    }
                    if (!abstractRoutingConnectionFactory.isLenientFallback()) {
                        throw new IllegalStateException("Cannot determine target ConnectionFactory for lookup key [" + value + "]");
                    }
                }
            }
        }
        return getConnectionFactory();
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void convertAndSend(Object obj) throws AmqpException {
        convertAndSend(this.exchange, this.routingKey, obj, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void correlationConvertAndSend(Object obj, CorrelationData correlationData) throws AmqpException {
        convertAndSend(this.exchange, this.routingKey, obj, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void convertAndSend(String str, Object obj) throws AmqpException {
        convertAndSend(this.exchange, str, obj, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void convertAndSend(String str, Object obj, CorrelationData correlationData) throws AmqpException {
        convertAndSend(this.exchange, str, obj, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void convertAndSend(String str, String str2, Object obj) throws AmqpException {
        convertAndSend(str, str2, obj, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void convertAndSend(String str, String str2, Object obj, @Nullable CorrelationData correlationData) throws AmqpException {
        send(str, str2, convertMessageIfNecessary(obj), correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void convertAndSend(Object obj, MessagePostProcessor messagePostProcessor) throws AmqpException {
        convertAndSend(this.exchange, this.routingKey, obj, messagePostProcessor);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void convertAndSend(String str, Object obj, MessagePostProcessor messagePostProcessor) throws AmqpException {
        convertAndSend(this.exchange, str, obj, messagePostProcessor, null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void convertAndSend(Object obj, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        convertAndSend(this.exchange, this.routingKey, obj, messagePostProcessor, correlationData);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void convertAndSend(String str, Object obj, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        convertAndSend(this.exchange, str, obj, messagePostProcessor, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public void convertAndSend(String str, String str2, Object obj, MessagePostProcessor messagePostProcessor) throws AmqpException {
        convertAndSend(str, str2, obj, messagePostProcessor, null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void convertAndSend(String str, String str2, Object obj, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException {
        send(str, str2, messagePostProcessor.postProcessMessage(convertMessageIfNecessary(obj), correlationData, nullSafeExchange(str), nullSafeRoutingKey(str2)), correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message receive() throws AmqpException {
        return receive(getRequiredQueue());
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message receive(String str) {
        return this.receiveTimeout == 0 ? doReceiveNoWait(str) : receive(str, this.receiveTimeout);
    }

    @Nullable
    protected Message doReceiveNoWait(String str) {
        Message message = (Message) execute(channel -> {
            GetResponse basicGet = channel.basicGet(str, !isChannelTransacted());
            if (basicGet == null) {
                return null;
            }
            long deliveryTag = basicGet.getEnvelope().getDeliveryTag();
            if (isChannelLocallyTransacted(channel)) {
                channel.basicAck(deliveryTag, false);
                channel.txCommit();
            } else if (isChannelTransacted()) {
                ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel, Long.valueOf(deliveryTag));
            }
            return buildMessageFromResponse(basicGet);
        }, obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, str));
        logReceived("Received: ", message);
        return message;
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message receive(long j) throws AmqpException {
        String requiredQueue = getRequiredQueue();
        return j == 0 ? doReceiveNoWait(requiredQueue) : receive(requiredQueue, j);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message receive(String str, long j) {
        Message message = (Message) execute(channel -> {
            Delivery consumeDelivery = consumeDelivery(channel, str, j);
            if (consumeDelivery == null) {
                return null;
            }
            if (isChannelLocallyTransacted(channel)) {
                channel.basicAck(consumeDelivery.getEnvelope().getDeliveryTag(), false);
                channel.txCommit();
            } else if (isChannelTransacted()) {
                ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel, Long.valueOf(consumeDelivery.getEnvelope().getDeliveryTag()));
            } else {
                channel.basicAck(consumeDelivery.getEnvelope().getDeliveryTag(), false);
            }
            return buildMessageFromDelivery(consumeDelivery);
        }, obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, null));
        logReceived("Received: ", message);
        return message;
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object receiveAndConvert() throws AmqpException {
        return receiveAndConvert(getRequiredQueue());
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object receiveAndConvert(String str) throws AmqpException {
        return receiveAndConvert(str, this.receiveTimeout);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object receiveAndConvert(long j) throws AmqpException {
        return receiveAndConvert(getRequiredQueue(), j);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object receiveAndConvert(String str, long j) throws AmqpException {
        Message doReceiveNoWait = j == 0 ? doReceiveNoWait(str) : receive(str, j);
        if (doReceiveNoWait != null) {
            return getRequiredMessageConverter().fromMessage(doReceiveNoWait);
        }
        return null;
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T receiveAndConvert(ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) receiveAndConvert(getRequiredQueue(), parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T receiveAndConvert(String str, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) receiveAndConvert(str, this.receiveTimeout, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T receiveAndConvert(long j, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) receiveAndConvert(getRequiredQueue(), j, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T receiveAndConvert(String str, long j, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        Message doReceiveNoWait = j == 0 ? doReceiveNoWait(str) : receive(str, j);
        if (doReceiveNoWait != null) {
            return (T) getRequiredSmartMessageConverter().fromMessage(doReceiveNoWait, parameterizedTypeReference);
        }
        return null;
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> receiveAndReplyCallback) throws AmqpException {
        return receiveAndReply(getRequiredQueue(), receiveAndReplyCallback);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public <R, S> boolean receiveAndReply(String str, ReceiveAndReplyCallback<R, S> receiveAndReplyCallback) throws AmqpException {
        return receiveAndReply(str, receiveAndReplyCallback, this.defaultReplyToAddressCallback);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> receiveAndReplyCallback, String str, String str2) throws AmqpException {
        return receiveAndReply(getRequiredQueue(), receiveAndReplyCallback, str, str2);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public <R, S> boolean receiveAndReply(String str, ReceiveAndReplyCallback<R, S> receiveAndReplyCallback, String str2, String str3) throws AmqpException {
        return receiveAndReply(str, receiveAndReplyCallback, (message, obj) -> {
            return new Address(str2, str3);
        });
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> receiveAndReplyCallback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
        return receiveAndReply(getRequiredQueue(), receiveAndReplyCallback, replyToAddressCallback);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    public <R, S> boolean receiveAndReply(String str, ReceiveAndReplyCallback<R, S> receiveAndReplyCallback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
        return doReceiveAndReply(str, receiveAndReplyCallback, replyToAddressCallback);
    }

    private <R, S> boolean doReceiveAndReply(String str, ReceiveAndReplyCallback<R, S> receiveAndReplyCallback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
        Boolean bool = (Boolean) execute(channel -> {
            Message receiveForReply = receiveForReply(str, channel);
            if (receiveForReply != null) {
                return Boolean.valueOf(sendReply(receiveAndReplyCallback, replyToAddressCallback, channel, receiveForReply));
            }
            return false;
        }, obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, str));
        if (bool == null) {
            return false;
        }
        return bool.booleanValue();
    }

    @Nullable
    private Message receiveForReply(String str, Channel channel) throws IOException {
        boolean isChannelTransacted = isChannelTransacted();
        boolean isChannelLocallyTransacted = isChannelLocallyTransacted(channel);
        Message message = null;
        if (this.receiveTimeout == 0) {
            GetResponse basicGet = channel.basicGet(str, !isChannelTransacted);
            if (basicGet != null) {
                long deliveryTag = basicGet.getEnvelope().getDeliveryTag();
                if (isChannelLocallyTransacted) {
                    channel.basicAck(deliveryTag, false);
                } else if (isChannelTransacted) {
                    ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel, Long.valueOf(deliveryTag));
                }
                message = buildMessageFromResponse(basicGet);
            }
        } else {
            Delivery consumeDelivery = consumeDelivery(channel, str, this.receiveTimeout);
            if (consumeDelivery != null) {
                long deliveryTag2 = consumeDelivery.getEnvelope().getDeliveryTag();
                if (!isChannelTransacted || isChannelLocallyTransacted) {
                    channel.basicAck(deliveryTag2, false);
                } else {
                    ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel, Long.valueOf(deliveryTag2));
                }
                message = buildMessageFromDelivery(consumeDelivery);
            }
        }
        logReceived("Received: ", message);
        return message;
    }

    @Nullable
    private Delivery consumeDelivery(Channel channel, String str, long j) throws IOException {
        Delivery delivery = null;
        Object obj = null;
        CompletableFuture<Delivery> completableFuture = new CompletableFuture<>();
        ShutdownListener shutdownListener = shutdownSignalException -> {
            if (RabbitUtils.isNormalChannelClose(shutdownSignalException)) {
                return;
            }
            completableFuture.completeExceptionally(shutdownSignalException);
        };
        channel.addShutdownListener(shutdownListener);
        ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
        try {
            try {
                DefaultConsumer createConsumer = createConsumer(str, channel, completableFuture, j < 0 ? 10000L : j);
                delivery = j < 0 ? completableFuture.get() : completableFuture.get(j, TimeUnit.MILLISECONDS);
                if (createConsumer != null && !(obj instanceof ConsumerCancelledException) && channel.isOpen()) {
                    cancelConsumerQuietly(channel, createConsumer);
                }
                try {
                    channel.removeShutdownListener(shutdownListener);
                } catch (Exception e) {
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                if (0 != 0 && !(obj instanceof ConsumerCancelledException) && channel.isOpen()) {
                    cancelConsumerQuietly(channel, null);
                }
                try {
                    channel.removeShutdownListener(shutdownListener);
                } catch (Exception e3) {
                }
            } catch (ExecutionException e4) {
                Throwable cause = e4.getCause();
                this.logger.error("Consumer failed to receive message: " + 0, cause);
                throw RabbitExceptionTranslator.convertRabbitAccessException(cause);
            } catch (TimeoutException e5) {
                RabbitUtils.setPhysicalCloseRequired(channel, true);
                if (0 != 0 && !(obj instanceof ConsumerCancelledException) && channel.isOpen()) {
                    cancelConsumerQuietly(channel, null);
                }
                try {
                    channel.removeShutdownListener(shutdownListener);
                } catch (Exception e6) {
                }
            }
            return delivery;
        } catch (Throwable th) {
            if (0 != 0 && !(obj instanceof ConsumerCancelledException) && channel.isOpen()) {
                cancelConsumerQuietly(channel, null);
            }
            try {
                channel.removeShutdownListener(shutdownListener);
            } catch (Exception e7) {
            }
            throw th;
        }
    }

    protected void logReceived(String str, @Nullable Message message) {
        if (this.logger.isDebugEnabled()) {
            if (message == null) {
                this.logger.debug(str + "no message");
            } else {
                this.logger.debug(str + message);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <R, S> boolean sendReply(ReceiveAndReplyCallback<R, S> receiveAndReplyCallback, ReplyToAddressCallback<S> replyToAddressCallback, Channel channel, Message message) throws IOException {
        R r = message;
        if (!ReceiveAndReplyMessageCallback.class.isAssignableFrom(receiveAndReplyCallback.getClass())) {
            r = getRequiredMessageConverter().fromMessage(message);
        }
        try {
            S handle = receiveAndReplyCallback.handle(r);
            if (handle != null) {
                doSendReply(replyToAddressCallback, channel, message, handle);
                return true;
            }
            if (!isChannelLocallyTransacted(channel)) {
                return true;
            }
            channel.txCommit();
            return true;
        } catch (ClassCastException e) {
            StackTraceElement[] stackTrace = e.getStackTrace();
            if (stackTrace[0].getMethodName().equals("handle") && stackTrace[1].getFileName().equals("RabbitTemplate.java")) {
                throw new IllegalArgumentException("ReceiveAndReplyCallback '" + receiveAndReplyCallback + "' can't handle received object '" + r + "'", e);
            }
            throw e;
        }
    }

    private <S> void doSendReply(ReplyToAddressCallback<S> replyToAddressCallback, Channel channel, Message message, S s) throws IOException {
        String messageId;
        Address replyToAddress = replyToAddressCallback.getReplyToAddress(message, s);
        Message convertMessageIfNecessary = convertMessageIfNecessary(s);
        MessageProperties messageProperties = message.getMessageProperties();
        MessageProperties messageProperties2 = convertMessageIfNecessary.getMessageProperties();
        Object correlationId = this.correlationKey == null ? messageProperties.getCorrelationId() : messageProperties.getHeaders().get(this.correlationKey);
        if (this.correlationKey == null || correlationId == null) {
            if (correlationId == null && (messageId = messageProperties.getMessageId()) != null) {
                correlationId = messageId;
            }
            messageProperties2.setCorrelationId((String) correlationId);
        } else {
            messageProperties2.setHeader(this.correlationKey, correlationId);
        }
        doSend(channel, replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), convertMessageIfNecessary, this.returnsCallback != null && isMandatoryFor(convertMessageIfNecessary).booleanValue(), null);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message sendAndReceive(Message message) throws AmqpException {
        return sendAndReceive(message, (CorrelationData) null);
    }

    @Nullable
    public Message sendAndReceive(Message message, @Nullable CorrelationData correlationData) throws AmqpException {
        return doSendAndReceive(this.exchange, this.routingKey, message, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message sendAndReceive(String str, Message message) throws AmqpException {
        return sendAndReceive(str, message, (CorrelationData) null);
    }

    @Nullable
    public Message sendAndReceive(String str, Message message, @Nullable CorrelationData correlationData) throws AmqpException {
        return doSendAndReceive(this.exchange, str, message, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Message sendAndReceive(String str, String str2, Message message) throws AmqpException {
        return sendAndReceive(str, str2, message, null);
    }

    @Nullable
    public Message sendAndReceive(String str, String str2, Message message, @Nullable CorrelationData correlationData) throws AmqpException {
        return doSendAndReceive(str, str2, message, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object convertSendAndReceive(Object obj) throws AmqpException {
        return convertSendAndReceive(obj, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public Object convertSendAndReceive(Object obj, @Nullable CorrelationData correlationData) throws AmqpException {
        return convertSendAndReceive(this.exchange, this.routingKey, obj, null, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object convertSendAndReceive(String str, Object obj) throws AmqpException {
        return convertSendAndReceive(str, obj, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public Object convertSendAndReceive(String str, Object obj, @Nullable CorrelationData correlationData) throws AmqpException {
        return convertSendAndReceive(this.exchange, str, obj, null, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object convertSendAndReceive(String str, String str2, Object obj) throws AmqpException {
        return convertSendAndReceive(str, str2, obj, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public Object convertSendAndReceive(String str, String str2, Object obj, @Nullable CorrelationData correlationData) throws AmqpException {
        return convertSendAndReceive(str, str2, obj, null, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object convertSendAndReceive(Object obj, MessagePostProcessor messagePostProcessor) throws AmqpException {
        return convertSendAndReceive(obj, messagePostProcessor, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public Object convertSendAndReceive(Object obj, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException {
        return convertSendAndReceive(this.exchange, this.routingKey, obj, messagePostProcessor, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object convertSendAndReceive(String str, Object obj, MessagePostProcessor messagePostProcessor) throws AmqpException {
        return convertSendAndReceive(str, obj, messagePostProcessor, (CorrelationData) null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public Object convertSendAndReceive(String str, Object obj, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException {
        return convertSendAndReceive(this.exchange, str, obj, messagePostProcessor, correlationData);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public Object convertSendAndReceive(String str, String str2, Object obj, MessagePostProcessor messagePostProcessor) throws AmqpException {
        return convertSendAndReceive(str, str2, obj, messagePostProcessor, null);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public Object convertSendAndReceive(String str, String str2, Object obj, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException {
        Message convertSendAndReceiveRaw = convertSendAndReceiveRaw(str, str2, obj, messagePostProcessor, correlationData);
        if (convertSendAndReceiveRaw == null) {
            return null;
        }
        return getRequiredMessageConverter().fromMessage(convertSendAndReceiveRaw);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T convertSendAndReceiveAsType(Object obj, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(obj, (CorrelationData) null, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T convertSendAndReceiveAsType(Object obj, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(this.exchange, this.routingKey, obj, null, correlationData, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, Object obj, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(str, obj, (CorrelationData) null, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, Object obj, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(this.exchange, str, obj, null, correlationData, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, String str2, Object obj, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(str, str2, obj, (CorrelationData) null, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T convertSendAndReceiveAsType(Object obj, @Nullable MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(obj, messagePostProcessor, (CorrelationData) null, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T convertSendAndReceiveAsType(Object obj, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(this.exchange, this.routingKey, obj, messagePostProcessor, correlationData, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, Object obj, @Nullable MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(str, obj, messagePostProcessor, (CorrelationData) null, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, Object obj, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(this.exchange, str, obj, messagePostProcessor, correlationData, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.core.AmqpTemplate
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, String str2, Object obj, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        return (T) convertSendAndReceiveAsType(str, str2, obj, messagePostProcessor, null, parameterizedTypeReference);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T convertSendAndReceiveAsType(String str, String str2, Object obj, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> parameterizedTypeReference) throws AmqpException {
        Message convertSendAndReceiveRaw = convertSendAndReceiveRaw(str, str2, obj, messagePostProcessor, correlationData);
        if (convertSendAndReceiveRaw == null) {
            return null;
        }
        return (T) getRequiredSmartMessageConverter().fromMessage(convertSendAndReceiveRaw, parameterizedTypeReference);
    }

    @Nullable
    protected Message convertSendAndReceiveRaw(String str, String str2, Object obj, @Nullable MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) {
        Message convertMessageIfNecessary = convertMessageIfNecessary(obj);
        if (messagePostProcessor != null) {
            convertMessageIfNecessary = messagePostProcessor.postProcessMessage(convertMessageIfNecessary, correlationData, nullSafeExchange(str), nullSafeRoutingKey(str2));
        }
        return doSendAndReceive(str, str2, convertMessageIfNecessary, correlationData);
    }

    protected Message convertMessageIfNecessary(Object obj) {
        return obj instanceof Message ? (Message) obj : getRequiredMessageConverter().toMessage(obj, new MessageProperties());
    }

    @Nullable
    protected Message doSendAndReceive(String str, String str2, Message message, @Nullable CorrelationData correlationData) {
        if (!this.evaluatedFastReplyTo) {
            synchronized (this) {
                if (!this.evaluatedFastReplyTo) {
                    evaluateFastReplyTo();
                }
            }
        }
        return (this.usingFastReplyTo && this.useDirectReplyToContainer) ? doSendAndReceiveWithDirect(str, str2, message, correlationData) : (this.replyAddress == null || this.usingFastReplyTo) ? doSendAndReceiveWithTemporary(str, str2, message, correlationData) : doSendAndReceiveWithFixed(str, str2, message, correlationData);
    }

    @Nullable
    protected Message doSendAndReceiveWithTemporary(String str, String str2, Message message, @Nullable CorrelationData correlationData) {
        return (Message) execute(channel -> {
            final PendingReply pendingReply = new PendingReply();
            String valueOf = String.valueOf(this.messageTagProvider.incrementAndGet());
            this.replyHolder.putIfAbsent(valueOf, pendingReply);
            Assert.isNull(message.getMessageProperties().getReplyTo(), "Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
            String queue = this.usingFastReplyTo ? Address.AMQ_RABBITMQ_REPLY_TO : channel.queueDeclare().getQueue();
            message.getMessageProperties().setReplyTo(queue);
            String uuid = UUID.randomUUID().toString();
            TemplateConsumer templateConsumer = new TemplateConsumer(channel) { // from class: org.springframework.amqp.rabbit.core.RabbitTemplate.1
                @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
                public void handleDelivery(String str3, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    Message message2 = new Message(bArr, RabbitTemplate.this.messagePropertiesConverter.toMessageProperties(basicProperties, envelope, RabbitTemplate.this.encoding));
                    if (RabbitTemplate.this.logger.isTraceEnabled()) {
                        RabbitTemplate.this.logger.trace("Message received " + message2);
                    }
                    if (RabbitTemplate.this.afterReceivePostProcessors != null) {
                        Iterator<MessagePostProcessor> it = RabbitTemplate.this.afterReceivePostProcessors.iterator();
                        while (it.hasNext()) {
                            message2 = it.next().postProcessMessage(message2);
                        }
                    }
                    pendingReply.reply(message2);
                }
            };
            ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
            ShutdownListener shutdownListener = shutdownSignalException -> {
                if (RabbitUtils.isNormalChannelClose(shutdownSignalException)) {
                    return;
                }
                pendingReply.completeExceptionally(shutdownSignalException);
            };
            channel.addShutdownListener(shutdownListener);
            channel.basicConsume(queue, true, uuid, this.noLocalReplyConsumer, true, null, templateConsumer);
            try {
                Message exchangeMessages = exchangeMessages(str, str2, message, correlationData, channel, pendingReply, valueOf);
                this.replyHolder.remove(valueOf);
                if (channel.isOpen()) {
                    cancelConsumerQuietly(channel, templateConsumer);
                }
                try {
                    channel.removeShutdownListener(shutdownListener);
                } catch (Exception e) {
                }
                return exchangeMessages;
            } catch (Throwable th) {
                this.replyHolder.remove(valueOf);
                if (channel.isOpen()) {
                    cancelConsumerQuietly(channel, templateConsumer);
                }
                try {
                    channel.removeShutdownListener(shutdownListener);
                } catch (Exception e2) {
                }
                throw th;
            }
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    private void cancelConsumerQuietly(Channel channel, DefaultConsumer defaultConsumer) {
        RabbitUtils.cancel(channel, defaultConsumer.getConsumerTag());
    }

    @Nullable
    protected Message doSendAndReceiveWithFixed(String str, String str2, Message message, @Nullable CorrelationData correlationData) {
        Assert.state(this.isListener, (Supplier<String>) () -> {
            return "RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': " + this.replyAddress;
        });
        return (Message) execute(channel -> {
            return doSendAndReceiveAsListener(str, str2, message, correlationData, channel, false);
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    @Nullable
    private Message doSendAndReceiveWithDirect(String str, String str2, Message message, @Nullable CorrelationData correlationData) {
        ConnectionFactory obtainTargetConnectionFactory = obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message);
        if (this.usePublisherConnection && obtainTargetConnectionFactory.getPublisherConnectionFactory() != null) {
            obtainTargetConnectionFactory = obtainTargetConnectionFactory.getPublisherConnectionFactory();
        }
        DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = this.directReplyToContainers.get(obtainTargetConnectionFactory);
        if (directReplyToMessageListenerContainer == null) {
            directReplyToMessageListenerContainer = createReplyToContainer(obtainTargetConnectionFactory);
        }
        DirectReplyToMessageListenerContainer.ChannelHolder channelHolder = directReplyToMessageListenerContainer.getChannelHolder();
        boolean z = false;
        try {
            try {
                Channel channel = channelHolder.getChannel();
                if (isPublisherConfirmsOrReturns(obtainTargetConnectionFactory)) {
                    addListener(channel);
                }
                Message doSendAndReceiveAsListener = doSendAndReceiveAsListener(str, str2, message, correlationData, channel, this.useChannelForCorrelation);
                if (doSendAndReceiveAsListener == null) {
                    if (this.useChannelForCorrelation) {
                        z = true;
                    }
                }
                directReplyToMessageListenerContainer.releaseConsumerFor(channelHolder, z, "Reply failed; consumer cannot be reused");
                return doSendAndReceiveAsListener;
            } catch (Exception e) {
                if (this.useChannelForCorrelation) {
                }
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            }
        } catch (Throwable th) {
            directReplyToMessageListenerContainer.releaseConsumerFor(channelHolder, false, "Reply failed; consumer cannot be reused");
            throw th;
        }
    }

    private DirectReplyToMessageListenerContainer createReplyToContainer(ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer;
        synchronized (this.directReplyToContainers) {
            directReplyToMessageListenerContainer = this.directReplyToContainers.get(connectionFactory);
            if (directReplyToMessageListenerContainer == null) {
                directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(connectionFactory);
                directReplyToMessageListenerContainer.setMessageListener(this);
                directReplyToMessageListenerContainer.setBeanName(this.beanName + "#" + this.containerInstance.getAndIncrement());
                if (this.taskExecutor != null) {
                    directReplyToMessageListenerContainer.setTaskExecutor(this.taskExecutor);
                }
                directReplyToMessageListenerContainer.setNoLocal(this.noLocalReplyConsumer);
                if (this.replyErrorHandler != null) {
                    directReplyToMessageListenerContainer.setErrorHandler(this.replyErrorHandler);
                }
                directReplyToMessageListenerContainer.start();
                this.directReplyToContainers.put(connectionFactory, directReplyToMessageListenerContainer);
                this.replyAddress = Address.AMQ_RABBITMQ_REPLY_TO;
            }
        }
        return directReplyToMessageListenerContainer;
    }

    @Nullable
    private Message doSendAndReceiveAsListener(String str, String str2, Message message, @Nullable CorrelationData correlationData, Channel channel, boolean z) throws Exception {
        PendingReply pendingReply = new PendingReply();
        String str3 = null;
        if (this.userCorrelationId) {
            str3 = this.correlationKey != null ? (String) message.getMessageProperties().getHeaders().get(this.correlationKey) : message.getMessageProperties().getCorrelationId();
        }
        if (str3 == null) {
            str3 = String.valueOf(this.messageTagProvider.incrementAndGet());
        }
        saveAndSetProperties(message, pendingReply, str3);
        this.replyHolder.put(str3, pendingReply);
        if (z) {
            this.replyHolder.put(channel, pendingReply);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Sending message with tag " + str3);
        }
        try {
            Message exchangeMessages = exchangeMessages(str, str2, message, correlationData, channel, pendingReply, str3);
            if (exchangeMessages != null && this.afterReceivePostProcessors != null) {
                Iterator<MessagePostProcessor> it = this.afterReceivePostProcessors.iterator();
                while (it.hasNext()) {
                    exchangeMessages = it.next().postProcessMessage(exchangeMessages);
                }
            }
            return exchangeMessages;
        } finally {
            this.replyHolder.remove(str3);
            if (z) {
                this.replyHolder.remove(channel);
            }
        }
    }

    private void saveAndSetProperties(Message message, PendingReply pendingReply, String str) {
        String replyTo = message.getMessageProperties().getReplyTo();
        pendingReply.setSavedReplyTo(replyTo);
        if (StringUtils.hasLength(replyTo) && this.logger.isDebugEnabled()) {
            this.logger.debug("Replacing replyTo header: " + replyTo + " in favor of template's configured reply-queue: " + this.replyAddress);
        }
        message.getMessageProperties().setReplyTo(this.replyAddress);
        if (this.userCorrelationId) {
            return;
        }
        String str2 = null;
        if (this.correlationKey == null) {
            String correlationId = message.getMessageProperties().getCorrelationId();
            if (correlationId != null) {
                str2 = correlationId;
            }
        } else {
            str2 = (String) message.getMessageProperties().getHeaders().get(this.correlationKey);
        }
        pendingReply.setSavedCorrelation(str2);
        if (this.correlationKey == null) {
            message.getMessageProperties().setCorrelationId(str);
        } else {
            message.getMessageProperties().setHeader(this.correlationKey, str);
        }
    }

    @Nullable
    private Message exchangeMessages(String str, String str2, Message message, @Nullable CorrelationData correlationData, Channel channel, PendingReply pendingReply, String str3) throws IOException, InterruptedException {
        boolean booleanValue = isMandatoryFor(message).booleanValue();
        if (booleanValue && this.returnsCallback == null) {
            message.getMessageProperties().getHeaders().put(RETURN_CORRELATION_KEY, str3);
        }
        doSend(channel, str, str2, message, booleanValue, correlationData);
        Message message2 = this.replyTimeout < 0 ? pendingReply.get() : pendingReply.get(this.replyTimeout, TimeUnit.MILLISECONDS);
        logReceived("Reply: ", message2);
        if (message2 == null) {
            replyTimedOut(message.getMessageProperties().getCorrelationId());
        }
        return message2;
    }

    protected void replyTimedOut(String str) {
    }

    public Boolean isMandatoryFor(Message message) {
        Boolean bool = (Boolean) this.mandatoryExpression.getValue(this.evaluationContext, message, Boolean.class);
        return bool != null ? bool : Boolean.FALSE;
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T execute(ChannelCallback<T> channelCallback) {
        return (T) execute(channelCallback, getConnectionFactory());
    }

    @Nullable
    private <T> T execute(ChannelCallback<T> channelCallback, ConnectionFactory connectionFactory) {
        if (this.retryTemplate == null) {
            return (T) doExecute(channelCallback, connectionFactory);
        }
        try {
            return (T) this.retryTemplate.execute(retryContext -> {
                return doExecute(channelCallback, connectionFactory);
            }, this.recoveryCallback);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e2);
        }
    }

    @Nullable
    private <T> T doExecute(ChannelCallback<T> channelCallback, ConnectionFactory connectionFactory) {
        Assert.notNull(channelCallback, "Callback object must not be null");
        Channel channel = null;
        boolean z = false;
        if (this.activeTemplateCallbacks.get() > 0) {
            channel = this.dedicatedChannels.get();
        }
        RabbitResourceHolder rabbitResourceHolder = null;
        Connection connection = null;
        if (channel != null) {
            z = true;
        } else if (isChannelTransacted()) {
            rabbitResourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(connectionFactory, true, this.usePublisherConnection);
            channel = rabbitResourceHolder.getChannel();
            if (channel == null) {
                ConnectionFactoryUtils.releaseResources(rabbitResourceHolder);
                throw new IllegalStateException("Resource holder returned a null channel");
            }
        } else {
            connection = ConnectionFactoryUtils.createConnection(connectionFactory, this.usePublisherConnection);
            if (connection == null) {
                throw new IllegalStateException("Connection factory returned a null connection");
            }
            try {
                channel = connection.createChannel(false);
                if (channel == null) {
                    throw new IllegalStateException("Connection returned a null channel");
                }
            } catch (RuntimeException e) {
                RabbitUtils.closeConnection(connection);
                throw e;
            }
        }
        try {
            try {
                T t = (T) invokeAction(channelCallback, connectionFactory, channel);
                cleanUpAfterAction(channel, z, rabbitResourceHolder, connection);
                return t;
            } catch (Exception e2) {
                if (isChannelLocallyTransacted(channel)) {
                    rabbitResourceHolder.rollbackAll();
                }
                throw convertRabbitAccessException(e2);
            }
        } catch (Throwable th) {
            cleanUpAfterAction(channel, z, rabbitResourceHolder, connection);
            throw th;
        }
    }

    private void cleanUpAfterAction(@Nullable Channel channel, boolean z, @Nullable RabbitResourceHolder rabbitResourceHolder, @Nullable Connection connection) {
        if (z) {
            return;
        }
        if (rabbitResourceHolder != null) {
            ConnectionFactoryUtils.releaseResources(rabbitResourceHolder);
        } else {
            RabbitUtils.closeChannel(channel);
            RabbitUtils.closeConnection(connection);
        }
    }

    @Nullable
    private <T> T invokeAction(ChannelCallback<T> channelCallback, ConnectionFactory connectionFactory, Channel channel) throws Exception {
        if (isPublisherConfirmsOrReturns(connectionFactory)) {
            addListener(channel);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Executing callback " + channelCallback.getClass().getSimpleName() + " on RabbitMQ Channel: " + channel);
        }
        return channelCallback.doInRabbit(channel);
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    @Nullable
    public <T> T invoke(RabbitOperations.OperationsCallback<T> operationsCallback, @Nullable com.rabbitmq.client.ConfirmCallback confirmCallback, @Nullable com.rabbitmq.client.ConfirmCallback confirmCallback2) {
        Channel createChannel;
        Channel channel = this.dedicatedChannels.get();
        Assert.state(channel == null, (Supplier<String>) () -> {
            return "Nested invoke() calls are not supported; channel '" + channel + "' is already associated with this thread";
        });
        this.activeTemplateCallbacks.incrementAndGet();
        RabbitResourceHolder rabbitResourceHolder = null;
        Connection connection = null;
        ConnectionFactory connectionFactory = getConnectionFactory();
        if (isChannelTransacted()) {
            rabbitResourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(connectionFactory, true, this.usePublisherConnection);
            createChannel = rabbitResourceHolder.getChannel();
            if (createChannel == null) {
                ConnectionFactoryUtils.releaseResources(rabbitResourceHolder);
                throw new IllegalStateException("Resource holder returned a null channel");
            }
        } else {
            if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
                connectionFactory = connectionFactory.getPublisherConnectionFactory();
            }
            connection = connectionFactory.createConnection();
            if (connection == null) {
                throw new IllegalStateException("Connection factory returned a null connection");
            }
            try {
                createChannel = connection.createChannel(false);
                if (createChannel == null) {
                    throw new IllegalStateException("Connection returned a null channel");
                }
                if (!connectionFactory.isPublisherConfirms()) {
                    RabbitUtils.setPhysicalCloseRequired(createChannel, true);
                }
                this.dedicatedChannels.set(createChannel);
            } catch (RuntimeException e) {
                RabbitUtils.closeConnection(connection);
                throw e;
            }
        }
        ConfirmListener addConfirmListener = addConfirmListener(confirmCallback, confirmCallback2, createChannel);
        try {
            T doInRabbit = operationsCallback.doInRabbit(this);
            cleanUpAfterAction(rabbitResourceHolder, connection, createChannel, addConfirmListener);
            return doInRabbit;
        } catch (Throwable th) {
            cleanUpAfterAction(rabbitResourceHolder, connection, createChannel, addConfirmListener);
            throw th;
        }
    }

    @Nullable
    private ConfirmListener addConfirmListener(@Nullable com.rabbitmq.client.ConfirmCallback confirmCallback, @Nullable com.rabbitmq.client.ConfirmCallback confirmCallback2, Channel channel) {
        ConfirmListener confirmListener = null;
        if (confirmCallback != null && confirmCallback2 != null && (channel instanceof ChannelProxy) && ((ChannelProxy) channel).isConfirmSelected()) {
            confirmListener = channel.addConfirmListener(confirmCallback, confirmCallback2);
        }
        return confirmListener;
    }

    private void cleanUpAfterAction(@Nullable RabbitResourceHolder rabbitResourceHolder, @Nullable Connection connection, @Nullable Channel channel, @Nullable ConfirmListener confirmListener) {
        if (confirmListener != null) {
            channel.removeConfirmListener(confirmListener);
        }
        this.activeTemplateCallbacks.decrementAndGet();
        this.dedicatedChannels.remove();
        if (rabbitResourceHolder != null) {
            ConnectionFactoryUtils.releaseResources(rabbitResourceHolder);
        } else {
            RabbitUtils.closeChannel(channel);
            RabbitUtils.closeConnection(connection);
        }
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public boolean waitForConfirms(long j) {
        Channel channel = this.dedicatedChannels.get();
        Assert.state(channel != null, "This operation is only available within the scope of an invoke operation");
        try {
            return channel.waitForConfirms(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        } catch (TimeoutException e2) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e2);
        }
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitOperations
    public void waitForConfirmsOrDie(long j) {
        Channel channel = this.dedicatedChannels.get();
        Assert.state(channel != null, "This operation is only available within the scope of an invoke operation");
        try {
            channel.waitForConfirmsOrDie(j);
        } catch (IOException | TimeoutException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw RabbitExceptionTranslator.convertRabbitAccessException(e2);
        }
    }

    private boolean isPublisherConfirmsOrReturns(ConnectionFactory connectionFactory) {
        return connectionFactory.isPublisherConfirms() || connectionFactory.isPublisherReturns();
    }

    public void doSend(Channel channel, String str, String str2, Message message, boolean z, @Nullable CorrelationData correlationData) {
        String str3;
        String nullSafeExchange = nullSafeExchange(str);
        String nullSafeRoutingKey = nullSafeRoutingKey(str2);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Original message to publish: " + message);
        }
        Message message2 = message;
        MessageProperties messageProperties = message2.getMessageProperties();
        if (z) {
            messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
        }
        if (this.beforePublishPostProcessors != null) {
            Iterator<MessagePostProcessor> it = this.beforePublishPostProcessors.iterator();
            while (it.hasNext()) {
                message2 = it.next().postProcessMessage(message2, correlationData, nullSafeExchange, nullSafeRoutingKey);
            }
        }
        setupConfirm(channel, message2, correlationData);
        if (this.userIdExpression != null && messageProperties.getUserId() == null && (str3 = (String) this.userIdExpression.getValue(this.evaluationContext, message2, String.class)) != null) {
            messageProperties.setUserId(str3);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Publishing message [" + message2 + "] on exchange [" + nullSafeExchange + "], routingKey = [" + nullSafeRoutingKey + "]");
        }
        observeTheSend(channel, message2, z, nullSafeExchange, nullSafeRoutingKey);
        if (isChannelLocallyTransacted(channel)) {
            RabbitUtils.commitIfNecessary(channel);
        }
    }

    protected void observeTheSend(Channel channel, Message message, boolean z, String str, String str2) {
        if (!this.observationRegistryObtained && this.observationEnabled) {
            obtainObservationRegistry(this.applicationContext);
            this.observationRegistryObtained = true;
        }
        RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, RabbitTemplateObservation.DefaultRabbitTemplateObservationConvention.INSTANCE, () -> {
            return new RabbitMessageSenderContext(message, this.beanName, str + "/" + str2);
        }, getObservationRegistry()).observe(() -> {
            sendToRabbit(channel, str, str2, z, message);
        });
    }

    public String nullSafeExchange(String str) {
        return str == null ? this.exchange : str;
    }

    public String nullSafeRoutingKey(String str) {
        return str == null ? this.routingKey : str;
    }

    protected void sendToRabbit(Channel channel, String str, String str2, boolean z, Message message) {
        try {
            channel.basicPublish(str, str2, z, this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding), message.getBody());
        } catch (IOException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
    }

    private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationData) {
        if ((!((channel instanceof ChannelProxy) && ((ChannelProxy) channel).isPublisherConfirms()) && this.confirmCallback == null) || !(channel instanceof PublisherCallbackChannel)) {
            if ((channel instanceof ChannelProxy) && ((ChannelProxy) channel).isConfirmSelected()) {
                message.getMessageProperties().setPublishSequenceNumber(channel.getNextPublishSeqNo());
                return;
            }
            return;
        }
        PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
        long nextPublishSeqNo = channel.getNextPublishSeqNo();
        if (nextPublishSeqNo <= 0) {
            this.logger.debug("Factory does not have confirms enabled");
            return;
        }
        CorrelationData postProcess = this.correlationDataPostProcessor != null ? this.correlationDataPostProcessor.postProcess(message, correlationData) : correlationData;
        message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
        publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo, new PendingConfirm(postProcess, System.currentTimeMillis()));
        if (postProcess == null || !StringUtils.hasText(postProcess.getId())) {
            return;
        }
        message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY, postProcess.getId());
    }

    protected boolean isChannelLocallyTransacted(Channel channel) {
        return isChannelTransacted() && !ConnectionFactoryUtils.isChannelTransactional(channel, getConnectionFactory());
    }

    private Message buildMessageFromDelivery(Delivery delivery) {
        return buildMessage(delivery.getEnvelope(), delivery.getProperties(), delivery.getBody(), -1);
    }

    private Message buildMessageFromResponse(GetResponse getResponse) {
        return buildMessage(getResponse.getEnvelope(), getResponse.getProps(), getResponse.getBody(), getResponse.getMessageCount());
    }

    private Message buildMessage(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr, int i) {
        MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(basicProperties, envelope, this.encoding);
        if (i >= 0) {
            messageProperties.setMessageCount(Integer.valueOf(i));
        }
        Message message = new Message(bArr, messageProperties);
        if (this.afterReceivePostProcessors != null) {
            Iterator<MessagePostProcessor> it = this.afterReceivePostProcessors.iterator();
            while (it.hasNext()) {
                message = it.next().postProcessMessage(message);
            }
        }
        return message;
    }

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        MessageConverter messageConverter = getMessageConverter();
        if (messageConverter == null) {
            throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        }
        return messageConverter;
    }

    private SmartMessageConverter getRequiredSmartMessageConverter() throws IllegalStateException {
        MessageConverter requiredMessageConverter = getRequiredMessageConverter();
        Assert.state(requiredMessageConverter instanceof SmartMessageConverter, "template's message converter must be a SmartMessageConverter");
        return (SmartMessageConverter) requiredMessageConverter;
    }

    private String getRequiredQueue() throws IllegalStateException {
        String str = this.defaultReceiveQueue;
        if (str == null) {
            throw new AmqpIllegalStateException("No 'queue' specified. Check configuration of RabbitTemplate.");
        }
        return str;
    }

    private Address getReplyToAddress(Message message) throws AmqpException {
        Address replyToAddress = message.getMessageProperties().getReplyToAddress();
        if (replyToAddress == null) {
            if (this.exchange == null) {
                throw new AmqpException("Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default Exchange was set.");
            }
            replyToAddress = new Address(this.exchange, this.routingKey);
        }
        return replyToAddress;
    }

    public void addListener(Channel channel) {
        if (!(channel instanceof PublisherCallbackChannel)) {
            throw new IllegalStateException("Channel does not support confirms or returns; is the connection factory configured for confirms or returns?");
        }
        PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
        if (this.publisherConfirmChannels.putIfAbsent(channel instanceof ChannelProxy ? ((ChannelProxy) channel).getTargetChannel() : channel, this) == null) {
            publisherCallbackChannel.addListener(this);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Added publisher confirm channel: " + channel + " to map, size now " + this.publisherConfirmChannels.size());
            }
        }
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener
    public void handleConfirm(PendingConfirm pendingConfirm, boolean z) {
        if (this.confirmCallback != null) {
            this.confirmCallback.confirm(pendingConfirm.getCorrelationData(), z, pendingConfirm.getCause());
        }
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener
    public void handleReturn(Return r10) {
        ReturnsCallback returnsCallback = this.returnsCallback;
        if (returnsCallback == null) {
            Object remove = r10.getProperties().getHeaders().remove(RETURN_CORRELATION_KEY);
            if (remove != null) {
                PendingReply pendingReply = this.replyHolder.get(remove.toString());
                if (pendingReply != null) {
                    returnsCallback = returnedMessage -> {
                        pendingReply.returned(new AmqpMessageReturnedException("Message returned", returnedMessage));
                    };
                } else if (this.logger.isWarnEnabled()) {
                    this.logger.warn("Returned request message but caller has timed out");
                }
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn("Returned message but no callback available");
            }
        }
        if (returnsCallback != null) {
            r10.getProperties().getHeaders().remove(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY);
            returnsCallback.returnedMessage(new ReturnedMessage(new Message(r10.getBody(), this.messagePropertiesConverter.toMessageProperties(r10.getProperties(), null, this.encoding)), r10.getReplyCode(), r10.getReplyText(), r10.getExchange(), r10.getRoutingKey()));
        }
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener
    public boolean isConfirmListener() {
        return this.confirmCallback != null;
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener
    public boolean isReturnListener() {
        return true;
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener
    public void revoke(Channel channel) {
        this.publisherConfirmChannels.remove(channel);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Removed publisher confirm channel: " + channel + " from map, size now " + this.publisherConfirmChannels.size());
        }
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel.Listener
    public String getUUID() {
        return this.uuid;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener
    public void onMessage(Message message, @Nullable Channel channel) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Message received " + message);
        }
        String correlationId = this.correlationKey == null ? message.getMessageProperties().getCorrelationId() : message.getMessageProperties().getHeaders().get(this.correlationKey);
        if (this.useChannelForCorrelation && channel != 0 && this.replyHolder.containsKey(channel)) {
            correlationId = channel;
        }
        if (correlationId == null) {
            throw new AmqpRejectAndDontRequeueException("No correlation header in reply");
        }
        PendingReply pendingReply = this.replyHolder.get(correlationId);
        if (pendingReply == null) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("Reply received after timeout for " + correlationId);
            }
            throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
        }
        restoreProperties(message, pendingReply);
        pendingReply.reply(message);
    }

    private void restoreProperties(Message message, PendingReply pendingReply) {
        if (!this.userCorrelationId) {
            String savedCorrelation = pendingReply.getSavedCorrelation();
            if (this.correlationKey == null) {
                message.getMessageProperties().setCorrelationId(savedCorrelation);
            } else if (savedCorrelation != null) {
                message.getMessageProperties().setHeader(this.correlationKey, savedCorrelation);
            } else {
                message.getMessageProperties().getHeaders().remove(this.correlationKey);
            }
        }
        String savedReplyTo = pendingReply.getSavedReplyTo();
        message.getMessageProperties().setReplyTo(savedReplyTo);
        if (!this.logger.isDebugEnabled() || savedReplyTo == null) {
            return;
        }
        this.logger.debug("Restored replyTo to " + savedReplyTo);
    }

    private DefaultConsumer createConsumer(final String str, Channel channel, final CompletableFuture<Delivery> completableFuture, long j) throws IOException, TimeoutException, InterruptedException {
        channel.basicQos(1);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        TemplateConsumer templateConsumer = new TemplateConsumer(channel) { // from class: org.springframework.amqp.rabbit.core.RabbitTemplate.2
            @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
            public void handleCancel(String str2) {
                completableFuture.completeExceptionally(new ConsumerCancelledException());
            }

            @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
            public void handleConsumeOk(String str2) {
                super.handleConsumeOk(str2);
                countDownLatch.countDown();
            }

            @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
            public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                completableFuture.complete(new Delivery(str2, envelope, basicProperties, bArr, str));
            }
        };
        channel.basicConsume(str, false, this.consumerArgs, (Consumer) templateConsumer);
        if (!countDownLatch.await(j, TimeUnit.MILLISECONDS)) {
            if (channel instanceof ChannelProxy) {
                ((ChannelProxy) channel).getTargetChannel().close();
            }
            completableFuture.completeExceptionally(new ConsumeOkNotReceivedException("Blocking receive, consumer failed to consume within " + j + " ms: " + completableFuture));
            RabbitUtils.setPhysicalCloseRequired(channel, true);
        }
        return templateConsumer;
    }
}
