package org.springframework.integration.handler;

import java.io.Serializable;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.EvaluationException;
import org.springframework.expression.Expression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.MessageStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.1.2.jar:org/springframework/integration/handler/DelayHandler.class */
public class DelayHandler extends AbstractReplyProducingMessageHandler implements DelayHandlerManagement, ApplicationListener<ContextRefreshedEvent> {
    public static final int DEFAULT_MAX_ATTEMPTS = 5;
    public static final long DEFAULT_RETRY_DELAY = 1000;
    private final String messageGroupId;
    private final ConcurrentMap<String, AtomicInteger> deliveries;
    private long defaultDelay;
    private Expression delayExpression;
    private boolean ignoreExpressionFailures;
    private MessageGroupStore messageStore;
    private List<Advice> delayedAdviceChain;
    private final AtomicBoolean initialized;
    private MessageHandler releaseHandler;
    private EvaluationContext evaluationContext;
    private MessageChannel delayedMessageErrorChannel;
    private String delayedMessageErrorChannelName;
    private int maxAttempts;
    private long retryDelay;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.1.2.jar:org/springframework/integration/handler/DelayHandler$DelayedMessageWrapper.class */
    public static final class DelayedMessageWrapper implements Serializable {
        private static final long serialVersionUID = -4739802369074947045L;
        private final long requestDate;
        private final Message<?> original;

        DelayedMessageWrapper(Message<?> message, long j) {
            this.original = message;
            this.requestDate = j;
        }

        public long getRequestDate() {
            return this.requestDate;
        }

        public Message<?> getOriginal() {
            return this.original;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.original.equals(((DelayedMessageWrapper) obj).original);
        }

        public int hashCode() {
            return this.original.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.1.2.jar:org/springframework/integration/handler/DelayHandler$ReleaseMessageHandler.class */
    public class ReleaseMessageHandler implements MessageHandler {
        ReleaseMessageHandler() {
        }

        @Override // org.springframework.messaging.MessageHandler
        public void handleMessage(Message<?> message) throws MessagingException {
            DelayHandler.this.doReleaseMessage(message);
        }
    }

    public DelayHandler(String str) {
        this.deliveries = new ConcurrentHashMap();
        this.ignoreExpressionFailures = true;
        this.initialized = new AtomicBoolean();
        this.releaseHandler = new ReleaseMessageHandler();
        this.maxAttempts = 5;
        this.retryDelay = 1000L;
        Assert.notNull(str, "'messageGroupId' must not be null");
        this.messageGroupId = str;
    }

    public DelayHandler(String str, TaskScheduler taskScheduler) {
        this(str);
        setTaskScheduler(taskScheduler);
    }

    public void setDefaultDelay(long j) {
        this.defaultDelay = j;
    }

    public void setDelayExpression(Expression expression) {
        this.delayExpression = expression;
    }

    public void setDelayExpressionString(String str) {
        this.delayExpression = EXPRESSION_PARSER.parseExpression(str);
    }

    public void setIgnoreExpressionFailures(boolean z) {
        this.ignoreExpressionFailures = z;
    }

    public void setMessageStore(MessageGroupStore messageGroupStore) {
        Assert.state(messageGroupStore != null, "MessageStore must not be null");
        this.messageStore = messageGroupStore;
    }

    public void setDelayedAdviceChain(List<Advice> list) {
        Assert.notNull(list, "delayedAdviceChain must not be null");
        this.delayedAdviceChain = list;
    }

    public void setDelayedMessageErrorChannel(MessageChannel messageChannel) {
        this.delayedMessageErrorChannel = messageChannel;
    }

    public void setDelayedMessageErrorChannelName(String str) {
        this.delayedMessageErrorChannelName = str;
    }

    public void setMaxAttempts(int i) {
        this.maxAttempts = i;
    }

    public void setRetryDelay(long j) {
        this.retryDelay = j;
    }

    private MessageChannel getErrorChannel() {
        if (this.delayedMessageErrorChannel != null) {
            return this.delayedMessageErrorChannel;
        }
        DestinationResolver<MessageChannel> channelResolver = getChannelResolver();
        if (this.delayedMessageErrorChannelName != null && channelResolver != null) {
            this.delayedMessageErrorChannel = channelResolver.resolveDestination(this.delayedMessageErrorChannelName);
        }
        return this.delayedMessageErrorChannel;
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "delayer";
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler, org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.IntegrationPattern
    public IntegrationPatternType getIntegrationPatternType() {
        return IntegrationPatternType.delayer;
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected void doInit() {
        if (this.messageStore == null) {
            this.messageStore = new SimpleMessageStore();
        } else {
            Assert.isInstanceOf(MessageStore.class, this.messageStore);
        }
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        this.releaseHandler = createReleaseMessageTask();
    }

    private MessageHandler createReleaseMessageTask() {
        ReleaseMessageHandler releaseMessageHandler = new ReleaseMessageHandler();
        if (CollectionUtils.isEmpty(this.delayedAdviceChain)) {
            return releaseMessageHandler;
        }
        ProxyFactory proxyFactory = new ProxyFactory(releaseMessageHandler);
        Iterator<Advice> it = this.delayedAdviceChain.iterator();
        while (it.hasNext()) {
            proxyFactory.addAdvice(it.next());
        }
        return (MessageHandler) proxyFactory.getProxy(getApplicationContext().getClassLoader());
    }

    @Override // org.springframework.integration.handler.AbstractMessageProducingHandler
    protected boolean shouldCopyRequestHeaders() {
        return false;
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected Object handleRequestMessage(Message<?> message) {
        boolean z = message.getPayload() instanceof DelayedMessageWrapper;
        if (!z) {
            long determineDelayForMessage = determineDelayForMessage(message);
            if (determineDelayForMessage > 0) {
                releaseMessageAfterDelay(message, determineDelayForMessage);
                return null;
            }
        }
        return z ? ((DelayedMessageWrapper) message.getPayload()).getOriginal() : message;
    }

    private long determineDelayForMessage(Message<?> message) {
        return this.delayExpression != null ? determineDelayFromExpression(message) : this.defaultDelay;
    }

    private long determineDelayFromExpression(Message<?> message) {
        long j = this.defaultDelay;
        DelayedMessageWrapper delayedMessageWrapper = null;
        if (message.getPayload() instanceof DelayedMessageWrapper) {
            delayedMessageWrapper = (DelayedMessageWrapper) message.getPayload();
        }
        Exception exc = null;
        Object obj = null;
        try {
            obj = this.delayExpression.getValue(this.evaluationContext, delayedMessageWrapper != null ? delayedMessageWrapper.getOriginal() : message);
        } catch (EvaluationException e) {
            exc = e;
        }
        if (obj instanceof Date) {
            j = ((Date) obj).getTime() - (delayedMessageWrapper != null ? delayedMessageWrapper.getRequestDate() : System.currentTimeMillis());
        } else if (obj != null) {
            try {
                j = Long.parseLong(obj.toString());
            } catch (NumberFormatException e2) {
                exc = e2;
            }
        }
        if (exc != null) {
            handleDelayValueException(exc);
        }
        return j;
    }

    private void handleDelayValueException(Exception exc) {
        if (!this.ignoreExpressionFailures) {
            throw new IllegalStateException("Error occurred during 'delay' value determination", exc);
        }
        this.logger.debug(() -> {
            return "Failed to get delay value from 'delayExpression': " + exc.getMessage() + ". Will fall back to default delay: " + this.defaultDelay;
        });
    }

    private void releaseMessageAfterDelay(Message<?> message, long j) {
        DelayedMessageWrapper delayedMessageWrapper;
        Message<?> message2 = message;
        if (message.getPayload() instanceof DelayedMessageWrapper) {
            delayedMessageWrapper = (DelayedMessageWrapper) message.getPayload();
        } else {
            delayedMessageWrapper = new DelayedMessageWrapper(message, Instant.now().toEpochMilli());
            message2 = getMessageBuilderFactory().withPayload(delayedMessageWrapper).copyHeaders(message.getHeaders()).build();
            this.messageStore.addMessageToGroup(this.messageGroupId, message2);
        }
        final Runnable releaseTaskForMessage = releaseTaskForMessage(message2);
        final Instant plusMillis = Instant.ofEpochMilli(delayedMessageWrapper.getRequestDate()).plusMillis(j);
        if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { // from class: org.springframework.integration.handler.DelayHandler.1
                @Override // org.springframework.transaction.support.TransactionSynchronization
                public void afterCommit() {
                    DelayHandler.this.getTaskScheduler().schedule(releaseTaskForMessage, plusMillis);
                }
            });
        } else {
            getTaskScheduler().schedule(releaseTaskForMessage, plusMillis);
        }
    }

    private Runnable releaseTaskForMessage(Message<?> message) {
        if (this.messageStore instanceof SimpleMessageStore) {
            return () -> {
                releaseMessage(message);
            };
        }
        UUID id = message.getHeaders().getId();
        return () -> {
            Message<?> messageById = getMessageById(id);
            if (messageById != null) {
                releaseMessage(messageById);
            }
        };
    }

    private Message<?> getMessageById(UUID uuid) {
        Message<?> message = ((MessageStore) this.messageStore).getMessage(uuid);
        if (message != null) {
            return message;
        }
        this.logger.debug(() -> {
            return "No message in the Message Store for id: " + uuid + ". Likely another instance has already released it.";
        });
        return null;
    }

    private void releaseMessage(Message<?> message) {
        String identityHexString = ObjectUtils.getIdentityHexString(message);
        this.deliveries.putIfAbsent(identityHexString, new AtomicInteger());
        try {
            this.releaseHandler.handleMessage(message);
            this.deliveries.remove(identityHexString);
        } catch (Exception e) {
            if (getErrorChannel() == null) {
                this.logger.debug(e, () -> {
                    return "Release flow threw an exception for message: " + message;
                });
                if (!rescheduleForRetry(message, identityHexString)) {
                    throw e;
                }
                return;
            }
            ErrorMessage errorMessage = new ErrorMessage(e, (Map<String, Object>) Collections.singletonMap(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger(this.deliveries.get(identityHexString).get() + 1)), message);
            try {
                if (getErrorChannel().send(errorMessage)) {
                    this.deliveries.remove(identityHexString);
                } else {
                    this.logger.debug(() -> {
                        return "Failed to send error message: " + errorMessage;
                    });
                    rescheduleForRetry(message, identityHexString);
                }
            } catch (Exception e2) {
                this.logger.debug(e2, () -> {
                    return "Error flow threw an exception for message: " + message;
                });
                rescheduleForRetry(message, identityHexString);
            }
        }
    }

    private boolean rescheduleForRetry(Message<?> message, String str) {
        if (this.deliveries.get(str).incrementAndGet() >= this.maxAttempts) {
            this.logger.error(() -> {
                return "Discarding; maximum release attempts reached for: " + message;
            });
            this.deliveries.remove(str);
            return false;
        }
        if (this.retryDelay <= 0) {
            rescheduleNow(message);
            return true;
        }
        rescheduleAt(message, new Date(System.currentTimeMillis() + this.retryDelay));
        return true;
    }

    private void rescheduleNow(Message<?> message) {
        rescheduleAt(message, new Date());
    }

    protected void rescheduleAt(Message<?> message, Date date) {
        getTaskScheduler().schedule(releaseTaskForMessage(message), date.toInstant());
    }

    private void doReleaseMessage(Message<?> message) {
        if (!removeDelayedMessageFromMessageStore(message) && this.deliveries.get(ObjectUtils.getIdentityHexString(message)).get() <= 0) {
            this.logger.debug(() -> {
                return "No message in the Message Store to release: " + message + ". Likely another instance has already released it.";
            });
            return;
        }
        if (!(this.messageStore instanceof SimpleMessageStore)) {
            this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
        }
        handleMessageInternal(message);
    }

    private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
        if (!(this.messageStore instanceof SimpleMessageStore)) {
            return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;
        }
        synchronized (this.messageGroupId) {
            if (!this.messageStore.getMessageGroup(this.messageGroupId).getMessages().contains(message)) {
                return false;
            }
            this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
            return true;
        }
    }

    @Override // org.springframework.integration.handler.DelayHandlerManagement
    public int getDelayedMessageCount() {
        return this.messageStore.messageGroupSize(this.messageGroupId);
    }

    @Override // org.springframework.integration.handler.DelayHandlerManagement
    public synchronized void reschedulePersistedMessages() {
        Stream<Message<?>> streamMessages = this.messageStore.getMessageGroup(this.messageGroupId).streamMessages();
        try {
            TaskScheduler taskScheduler = getTaskScheduler();
            streamMessages.forEach(message -> {
                taskScheduler.schedule(() -> {
                    long determineDelayForMessage = determineDelayForMessage(message);
                    if (determineDelayForMessage > 0) {
                        releaseMessageAfterDelay(message, determineDelayForMessage);
                    } else {
                        releaseMessage(message);
                    }
                }, Instant.now());
            });
            if (streamMessages != null) {
                streamMessages.close();
            }
        } catch (Throwable th) {
            if (streamMessages != null) {
                try {
                    streamMessages.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (!contextRefreshedEvent.getApplicationContext().equals(getApplicationContext()) || this.initialized.getAndSet(true)) {
            return;
        }
        reschedulePersistedMessages();
    }
}
