package org.openhubframework.openhub.core.common.asynch.queue;

import java.time.Instant;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.ExchangeBuilder;
import org.openhubframework.openhub.api.configuration.ConfigurableValue;
import org.openhubframework.openhub.api.configuration.ConfigurationItem;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.exception.IntegrationException;
import org.openhubframework.openhub.api.exception.InternalErrorEnum;
import org.openhubframework.openhub.api.exception.LockFailureException;
import org.openhubframework.openhub.common.time.Seconds;
import org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute;
import org.openhubframework.openhub.core.common.asynch.LogContextHelper;
import org.openhubframework.openhub.core.common.event.AsynchEventHelper;
import org.openhubframework.openhub.spi.msg.MessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

@Service
/* loaded from: input_file:org/openhubframework/openhub/core/common/asynch/queue/MessagePollExecutor.class */
public class MessagePollExecutor implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MessagePollExecutor.class);
    private static final int LOCK_FAILURE_LIMIT = 5;

    @Autowired
    private MessagesPool messagesPool;

    @Autowired
    private ProducerTemplate producerTemplate;

    @Autowired
    private MessageService messageService;

    @ConfigurableValue(key = "ohf.asynch.postponedIntervalWhenFailedSec")
    private ConfigurationItem<Seconds> postponedIntervalWhenFailed;
    private String targetURI = AsynchMessageRoute.URI_ASYNC_PROCESSING_MSG;

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("Message pooling starts ...");
        Message message = null;
        int i = 0;
        while (true) {
            try {
                message = this.messagesPool.getNextMessage();
            } catch (Exception e) {
                LOG.error("Error occurred during getting message " + (message != null ? message.toHumanString() : ""), e);
            } catch (LockFailureException unused) {
                i++;
                if (i > LOCK_FAILURE_LIMIT) {
                    LOG.warn("Probably problem with locking messages - count of lock failures exceeds limit (5).");
                    break;
                }
            }
            if (message == null) {
                break;
            }
            LogContextHelper.setLogContextParams(message, null);
            startMessageProcessing(message);
        }
        LOG.debug("Message pooling finished.");
    }

    void startMessageProcessing(final Message message) {
        Assert.notNull(message, "the msg must not be null");
        if (isMsgInGuaranteedOrder(message)) {
            this.producerTemplate.sendBodyAndHeader(this.targetURI, ExchangePattern.InOnly, message, "insertMsgToQueue", Long.valueOf(System.currentTimeMillis()));
            return;
        }
        if (message.getReceiveTimestamp().isBefore(Instant.now().minusSeconds(this.postponedIntervalWhenFailed.getValue().getSeconds()))) {
            this.producerTemplate.send("direct:errorAsyncFatal", ExchangePattern.InOnly, new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.queue.MessagePollExecutor.1
                public void process(Exchange exchange) throws Exception {
                    exchange.setProperty("CamelExceptionCaught", new IntegrationException(InternalErrorEnum.E121, "Message " + message.toHumanString() + " exceeded interval for starting processing => changed to FAILED state"));
                    exchange.getIn().setHeader("processingMessage", message);
                }
            });
            return;
        }
        this.messageService.setStatePostponed(message);
        Exchange build = ExchangeBuilder.anExchange(this.producerTemplate.getCamelContext()).build();
        build.getIn().setHeader("processingMessage", message);
        AsynchEventHelper.notifyMsgPostponed(build);
    }

    private boolean isMsgInGuaranteedOrder(Message message) {
        if (!message.isGuaranteedOrder()) {
            return true;
        }
        List messagesForGuaranteedOrderForRoute = this.messageService.getMessagesForGuaranteedOrderForRoute(message.getFunnelValue(), message.isExcludeFailedState());
        if (messagesForGuaranteedOrderForRoute.size() == 1) {
            LOG.debug("There is only one processing message with funnel value: " + message.getFunnelValue() + " => continue");
            return true;
        }
        if (((Message) messagesForGuaranteedOrderForRoute.get(0)).equals(message)) {
            LOG.debug("Processing message (msg_id = {}, funnel value = '{}') is the first one => continue", message.getMsgId(), message.getFunnelValue());
            return true;
        }
        LOG.debug("There is at least one processing message with funnel value '{}' before current message (msg_id = {}); message {} will be postponed.", new Object[]{message.getFunnelValue(), message.getMsgId(), message.toHumanString()});
        return false;
    }
}
