package org.springframework.messaging.core;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;

/* loaded from: input_file:WEB-INF/lib/spring-messaging-5.2.2.RELEASE.jar:org/springframework/messaging/core/GenericMessagingTemplate.class */
public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> implements BeanFactoryAware {
    public static final String DEFAULT_SEND_TIMEOUT_HEADER = "sendTimeout";
    public static final String DEFAULT_RECEIVE_TIMEOUT_HEADER = "receiveTimeout";
    private volatile long sendTimeout = -1;
    private volatile long receiveTimeout = -1;
    private String sendTimeoutHeader = DEFAULT_SEND_TIMEOUT_HEADER;
    private String receiveTimeoutHeader = DEFAULT_RECEIVE_TIMEOUT_HEADER;
    private volatile boolean throwExceptionOnLateReply = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/spring-messaging-5.2.2.RELEASE.jar:org/springframework/messaging/core/GenericMessagingTemplate$TemporaryReplyChannel.class */
    public static final class TemporaryReplyChannel implements PollableChannel {
        private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
        private final CountDownLatch replyLatch = new CountDownLatch(1);
        private final boolean throwExceptionOnLateReply;

        @Nullable
        private volatile Message<?> replyMessage;
        private volatile boolean hasReceived;
        private volatile boolean hasTimedOut;
        private volatile boolean hasSendFailed;

        TemporaryReplyChannel(boolean z) {
            this.throwExceptionOnLateReply = z;
        }

        public void setSendFailed(boolean z) {
            this.hasSendFailed = z;
        }

        @Override // org.springframework.messaging.PollableChannel
        @Nullable
        public Message<?> receive() {
            return receive(-1L);
        }

        @Override // org.springframework.messaging.PollableChannel
        @Nullable
        public Message<?> receive(long j) {
            try {
                if (j < 0) {
                    this.replyLatch.await();
                    this.hasReceived = true;
                } else if (this.replyLatch.await(j, TimeUnit.MILLISECONDS)) {
                    this.hasReceived = true;
                } else {
                    this.hasTimedOut = true;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return this.replyMessage;
        }

        @Override // org.springframework.messaging.MessageChannel
        public boolean send(Message<?> message) {
            return send(message, -1L);
        }

        @Override // org.springframework.messaging.MessageChannel
        public boolean send(Message<?> message, long j) {
            this.replyMessage = message;
            boolean z = this.hasReceived;
            this.replyLatch.countDown();
            String str = null;
            if (this.hasTimedOut) {
                str = "Reply message received but the receiving thread has exited due to a timeout";
            } else if (z) {
                str = "Reply message received but the receiving thread has already received a reply";
            } else if (this.hasSendFailed) {
                str = "Reply message received but the receiving thread has exited due to an exception while sending the request message";
            }
            if (str == null) {
                return true;
            }
            if (this.logger.isWarnEnabled()) {
                this.logger.warn(str + ": " + message);
            }
            if (this.throwExceptionOnLateReply) {
                throw new MessageDeliveryException(message, str);
            }
            return true;
        }
    }

    public void setSendTimeout(long j) {
        this.sendTimeout = j;
    }

    public long getSendTimeout() {
        return this.sendTimeout;
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    public long getReceiveTimeout() {
        return this.receiveTimeout;
    }

    public void setSendTimeoutHeader(String str) {
        Assert.notNull(str, "'sendTimeoutHeader' cannot be null");
        this.sendTimeoutHeader = str;
    }

    public String getSendTimeoutHeader() {
        return this.sendTimeoutHeader;
    }

    public void setReceiveTimeoutHeader(String str) {
        Assert.notNull(str, "'receiveTimeoutHeader' cannot be null");
        this.receiveTimeoutHeader = str;
    }

    public String getReceiveTimeoutHeader() {
        return this.receiveTimeoutHeader;
    }

    public void setThrowExceptionOnLateReply(boolean z) {
        this.throwExceptionOnLateReply = z;
    }

    @Override // org.springframework.beans.factory.BeanFactoryAware
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
    }

    protected final void doSend(MessageChannel messageChannel, Message<?> message) {
        doSend(messageChannel, message, sendTimeout(message));
    }

    protected final void doSend(MessageChannel messageChannel, Message<?> message, long j) {
        Assert.notNull(messageChannel, "MessageChannel is required");
        Message<?> message2 = message;
        MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, (Class<MessageHeaderAccessor>) MessageHeaderAccessor.class);
        if (accessor != null && accessor.isMutable()) {
            accessor.removeHeader(this.sendTimeoutHeader);
            accessor.removeHeader(this.receiveTimeoutHeader);
            accessor.setImmutable();
        } else if (message.getHeaders().containsKey(this.sendTimeoutHeader) || message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
            message2 = MessageBuilder.fromMessage(message).setHeader(this.sendTimeoutHeader, null).setHeader(this.receiveTimeoutHeader, null).build();
        }
        if (!(j >= 0 ? messageChannel.send(message2, j) : messageChannel.send(message2))) {
            throw new MessageDeliveryException(message, "Failed to send message to channel '" + messageChannel + "' within timeout: " + j);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.messaging.core.AbstractMessageReceivingTemplate
    @Nullable
    public final Message<?> doReceive(MessageChannel messageChannel) {
        return doReceive(messageChannel, this.receiveTimeout);
    }

    @Nullable
    protected final Message<?> doReceive(MessageChannel messageChannel, long j) {
        Assert.notNull(messageChannel, "MessageChannel is required");
        Assert.state(messageChannel instanceof PollableChannel, "A PollableChannel is required to receive messages");
        Message<?> receive = j >= 0 ? ((PollableChannel) messageChannel).receive(j) : ((PollableChannel) messageChannel).receive();
        if (receive == null && this.logger.isTraceEnabled()) {
            this.logger.trace("Failed to receive message from channel '" + messageChannel + "' within timeout: " + j);
        }
        return receive;
    }

    @Nullable
    protected final Message<?> doSendAndReceive(MessageChannel messageChannel, Message<?> message) {
        Assert.notNull(messageChannel, "'channel' is required");
        Object replyChannel = message.getHeaders().getReplyChannel();
        Object errorChannel = message.getHeaders().getErrorChannel();
        long sendTimeout = sendTimeout(message);
        long receiveTimeout = receiveTimeout(message);
        TemporaryReplyChannel temporaryReplyChannel = new TemporaryReplyChannel(this.throwExceptionOnLateReply);
        try {
            doSend(messageChannel, MessageBuilder.fromMessage(message).setReplyChannel(temporaryReplyChannel).setHeader(this.sendTimeoutHeader, null).setHeader(this.receiveTimeoutHeader, null).setErrorChannel(temporaryReplyChannel).build(), sendTimeout);
            Message<?> doReceive = doReceive(temporaryReplyChannel, receiveTimeout);
            if (doReceive != null) {
                doReceive = MessageBuilder.fromMessage(doReceive).setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel).setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel).build();
            }
            return doReceive;
        } catch (RuntimeException e) {
            temporaryReplyChannel.setSendFailed(true);
            throw e;
        }
    }

    private long sendTimeout(Message<?> message) {
        Long headerToLong = headerToLong(message.getHeaders().get(this.sendTimeoutHeader));
        return headerToLong != null ? headerToLong.longValue() : this.sendTimeout;
    }

    private long receiveTimeout(Message<?> message) {
        Long headerToLong = headerToLong(message.getHeaders().get(this.receiveTimeoutHeader));
        return headerToLong != null ? headerToLong.longValue() : this.receiveTimeout;
    }

    @Nullable
    private Long headerToLong(@Nullable Object obj) {
        if (obj instanceof Number) {
            return Long.valueOf(((Number) obj).longValue());
        }
        if (obj instanceof String) {
            return Long.valueOf(Long.parseLong((String) obj));
        }
        return null;
    }

    @Override // org.springframework.messaging.core.AbstractMessagingTemplate
    @Nullable
    protected /* bridge */ /* synthetic */ Message doSendAndReceive(Object obj, Message message) {
        return doSendAndReceive((MessageChannel) obj, (Message<?>) message);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.messaging.core.AbstractMessageSendingTemplate
    public /* bridge */ /* synthetic */ void doSend(Object obj, Message message) {
        doSend((MessageChannel) obj, (Message<?>) message);
    }
}
