package org.openhubframework.openhub.core.common.asynch.repair;

import java.time.Instant;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.openhubframework.openhub.api.configuration.ConfigurableValue;
import org.openhubframework.openhub.api.configuration.ConfigurationItem;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.entity.MsgStateEnum;
import org.openhubframework.openhub.api.exception.IntegrationException;
import org.openhubframework.openhub.api.exception.InternalErrorEnum;
import org.openhubframework.openhub.common.time.Seconds;
import org.openhubframework.openhub.core.common.dao.MessageDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

@Service(RepairMessageService.BEAN)
/* loaded from: input_file:org/openhubframework/openhub/core/common/asynch/repair/RepairMessageServiceDbImpl.class */
public class RepairMessageServiceDbImpl implements RepairMessageService {
    private static final Logger LOG = LoggerFactory.getLogger(RepairMessageServiceDbImpl.class);
    private static final int BATCH_SIZE = 10;
    private TransactionTemplate transactionTemplate;

    @Autowired
    private MessageDao messageDao;

    @Autowired
    private ProducerTemplate producerTemplate;

    @ConfigurableValue(key = "ohf.asynch.repairRepeatTimeSec")
    private ConfigurationItem<Seconds> repeatInterval;

    @ConfigurableValue(key = "ohf.asynch.countPartlyFailsBeforeFailed")
    private ConfigurationItem<Integer> countPartlyFailsBeforeFailed;

    @Autowired
    public RepairMessageServiceDbImpl(PlatformTransactionManager platformTransactionManager) {
        Assert.notNull(platformTransactionManager, "the transactionManager must not be null");
        this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
    }

    @Override // org.openhubframework.openhub.core.common.asynch.repair.RepairMessageService
    public void repairProcessingMessages() {
        List<Message> findProcessingMessages = findProcessingMessages();
        LOG.debug("Found {} message(s) for repairing ...", Integer.valueOf(findProcessingMessages.size()));
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= findProcessingMessages.size()) {
                return;
            }
            int min = Math.min(i2 + BATCH_SIZE, findProcessingMessages.size());
            updateMessagesInDB(findProcessingMessages.subList(i2, min));
            i = min;
        }
    }

    private List<Message> findProcessingMessages() {
        return (List) this.transactionTemplate.execute(new TransactionCallback<List<Message>>() { // from class: org.openhubframework.openhub.core.common.asynch.repair.RepairMessageServiceDbImpl.1
            /* renamed from: doInTransaction, reason: merged with bridge method [inline-methods] */
            public List<Message> m28doInTransaction(TransactionStatus transactionStatus) {
                return RepairMessageServiceDbImpl.this.messageDao.findProcessingMessages(RepairMessageServiceDbImpl.this.repeatInterval.getValue().toDuration());
            }
        });
    }

    private void updateMessagesInDB(final List<Message> list) {
        final Instant now = Instant.now();
        this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { // from class: org.openhubframework.openhub.core.common.asynch.repair.RepairMessageServiceDbImpl.2
            protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                for (final Message message : list) {
                    if (message.getFailedCount() >= ((Integer) RepairMessageServiceDbImpl.this.countPartlyFailsBeforeFailed.getValue()).intValue()) {
                        RepairMessageServiceDbImpl.LOG.warn("The message " + message.toHumanString() + " was in PROCESSING state and exceeded max. count of failures. Message is redirected to processing of failed message.");
                        RepairMessageServiceDbImpl.this.producerTemplate.send("direct:errorAsyncFatal", ExchangePattern.InOnly, new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.repair.RepairMessageServiceDbImpl.2.1
                            public void process(Exchange exchange) throws Exception {
                                exchange.setProperty("CamelExceptionCaught", new IntegrationException(InternalErrorEnum.E116));
                                exchange.getIn().setHeader("processingMessage", message);
                            }
                        });
                    } else {
                        message.setLastUpdateTimestamp(now);
                        message.setState(MsgStateEnum.PARTLY_FAILED);
                        message.setFailedCount(message.getFailedCount() + 1);
                        RepairMessageServiceDbImpl.this.messageDao.update(message);
                        RepairMessageServiceDbImpl.LOG.warn("The message " + message.toHumanString() + " was in PROCESSING state and changed to PARTLY_FAILED.", message.getMsgId(), message.getCorrelationId());
                    }
                }
            }
        });
    }
}
