package org.openhubframework.openhub.core.common.asynch;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.camel.Body;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Handler;
import org.apache.camel.Headers;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.model.ChoiceDefinition;
import org.openhubframework.openhub.api.asynch.model.CallbackResponse;
import org.openhubframework.openhub.api.asynch.model.ConfirmationTypes;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.exception.IntegrationException;
import org.openhubframework.openhub.api.exception.InternalErrorEnum;
import org.openhubframework.openhub.api.exception.StoppingException;
import org.openhubframework.openhub.api.exception.ThrottlingExceededException;
import org.openhubframework.openhub.api.route.AbstractBasicRoute;
import org.openhubframework.openhub.api.route.CamelConfiguration;
import org.openhubframework.openhub.core.common.asynch.msg.MessageTransformer;
import org.openhubframework.openhub.core.common.event.AsynchEventHelper;
import org.openhubframework.openhub.core.common.exception.ExceptionTranslator;
import org.openhubframework.openhub.core.common.validator.TraceIdentifierValidator;
import org.openhubframework.openhub.spi.msg.MessageService;
import org.openhubframework.openhub.spi.throttling.ThrottleScope;
import org.openhubframework.openhub.spi.throttling.ThrottlingProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;

@CamelConfiguration(AsynchInMessageRoute.ROUTE_BEAN)
/* loaded from: input_file:org/openhubframework/openhub/core/common/asynch/AsynchInMessageRoute.class */
public class AsynchInMessageRoute extends AbstractBasicRoute {
    private static final Logger LOG = LoggerFactory.getLogger(AsynchInMessageRoute.class);
    public static final String ROUTE_BEAN = "inMsgRouteBean";
    public static final String ROUTE_ID_ASYNC = "asyncProcessIn_route";
    static final int NEW_MSG_PRIORITY = 10;
    static final String URI_GUARANTEED_ORDER_ROUTE = "direct:guaranteedOrderRoute";
    static final String ROUTE_ID_GUARANTEED_ORDER = "guaranteedOrder_route";

    @Autowired
    private ThrottlingProcessor throttlingProcessor;

    @Autowired
    private MessageService messageService;

    @Autowired(required = false)
    private List<TraceIdentifierValidator> validatorList;

    public void doConfigure() throws Exception {
        from("direct:asynch_in_message_route").routeId(ROUTE_ID_ASYNC).doTry().validate(header("asynchService").isNotNull()).validate(header("asynchOperation").isNotNull()).process(new TraceHeaderProcessor(true, this.validatorList)).removeHeader("CamelSpringWebserviceSoapHeader").bean(MessageTransformer.getInstance(), "createMessage").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchInMessageRoute.1
            public void process(Exchange exchange) throws Exception {
                Message message = (Message) exchange.getIn().getBody(Message.class);
                Assert.notNull(message, "the msg must not be null");
                AsynchInMessageRoute.this.throttlingProcessor.throttle(new ThrottleScope(message.getSourceSystem().getSystemName(), message.getOperationName()));
            }
        }).id("throttleProcess").bean(ROUTE_BEAN, "insertMessage").to(URI_GUARANTEED_ORDER_ROUTE).bean(ROUTE_BEAN, "createOkResponse").endDoTry().doCatch(ThrottlingExceededException.class).log(LoggingLevel.ERROR, "Incoming route - throttling rules were exceeded: ${property.CamelExceptionCaught.message}.").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchInMessageRoute.2
            public void process(Exchange exchange) throws Exception {
                throw ((Exception) exchange.getProperty("CamelExceptionCaught"));
            }
        }).doCatch(StoppingException.class).log(LoggingLevel.INFO, "Incoming route - asynchronous message was rejected because ESB was stopping.").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchInMessageRoute.3
            public void process(Exchange exchange) throws Exception {
                throw ((Exception) exchange.getProperty("CamelExceptionCaught"));
            }
        }).doCatch(new Class[]{SQLException.class, Exception.class}).process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchInMessageRoute.4
            public void process(Exchange exchange) throws Exception {
                AsynchInMessageRoute.LOG.error("Incoming route - error during saving incoming message: ", (Exception) exchange.getProperty("CamelExceptionCaught"));
            }
        }).bean(AsynchInMessageRoute.class, "createFailResponse").end().process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchInMessageRoute.5
            public void process(Exchange exchange) throws Exception {
            }
        });
        ((ChoiceDefinition) from(URI_GUARANTEED_ORDER_ROUTE).routeId(ROUTE_ID_GUARANTEED_ORDER).errorHandler(noErrorHandler()).validate(body().isInstanceOf(Message.class)).choice().when().method(ROUTE_BEAN, "isMsgInGuaranteedOrder")).bean(ROUTE_BEAN, "saveLogContextParams").bean(ROUTE_BEAN, "setInsertQueueTimestamp").bean(ROUTE_BEAN, "setMsgPriority").to(ExchangePattern.RobustInOnly, "direct:asyncProcessingIn").id("toAsyncRoute").otherwise().bean(ROUTE_BEAN, "postponeMessage").end().process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchInMessageRoute.6
            public void process(Exchange exchange) throws Exception {
            }
        });
    }

    @Handler
    public Message insertMessage(@Body Message message) {
        Assert.notNull(message, "msg can not be null");
        LOG.debug("Insert new asynch message '" + message.toHumanString() + "'.");
        this.messageService.insertMessage(message);
        return message;
    }

    @Handler
    public boolean isMsgInGuaranteedOrder(@Body Message message) {
        if (!message.isGuaranteedOrder()) {
            return true;
        }
        List messagesForGuaranteedOrderForRoute = ((MessageService) getBean(MessageService.class)).getMessagesForGuaranteedOrderForRoute(message.getFunnelValue(), message.isExcludeFailedState());
        if (messagesForGuaranteedOrderForRoute.size() == 1) {
            LOG.debug("There is only one processing message with funnel value: " + message.getFunnelValue() + " => continue");
            return true;
        }
        if (((Message) messagesForGuaranteedOrderForRoute.get(0)).equals(message)) {
            LOG.debug("Processing message (msg_id = {}, funnel value = '{}') is the first one => continue", message.getMsgId(), message.getFunnelValue());
            return true;
        }
        LOG.debug("There is at least one processing message with funnel value '{}' before current message (msg_id = {}); message {} will be postponed.", new Object[]{message.getFunnelValue(), message.getMsgId(), message.toHumanString()});
        return false;
    }

    @Handler
    public void postponeMessage(Exchange exchange, @Body Message message) {
        exchange.getIn().setHeader("processingMessage", message);
        ((MessageService) getBean(MessageService.class)).setStatePostponed(message);
        AsynchEventHelper.notifyMsgPostponed(exchange);
    }

    @Handler
    public void saveLogContextParams(@Body Message message, @Headers Map<String, Object> map) {
        Map copyOfContextMap = MDC.getCopyOfContextMap();
        String str = null;
        if (copyOfContextMap != null && copyOfContextMap.get("REQUEST_ID") != null) {
            str = (String) copyOfContextMap.get("REQUEST_ID");
            map.put("REQUEST_ID", str);
        }
        LogContextHelper.setLogContextParams(message, str);
    }

    @Handler
    public void setInsertQueueTimestamp(@Headers Map<String, Object> map) {
        map.put("insertMsgToQueue", Long.valueOf(System.currentTimeMillis()));
    }

    @Handler
    public void setMsgPriority(@Body Message message) {
        message.setProcessingPriority(NEW_MSG_PRIORITY);
    }

    @Handler
    public CallbackResponse createOkResponse(Exchange exchange) {
        CallbackResponse callbackResponse = new CallbackResponse();
        callbackResponse.setStatus(ConfirmationTypes.OK);
        return callbackResponse;
    }

    @Handler
    public void createFailResponse(Exchange exchange) {
        if (exchange.getProperty("errorCallbackResponse") != null) {
            return;
        }
        CallbackResponse callbackResponse = new CallbackResponse();
        callbackResponse.setStatus(ConfirmationTypes.FAIL);
        IntegrationException integrationException = (Exception) exchange.getProperty("CamelExceptionCaught");
        callbackResponse.setAdditionalInfo(integrationException instanceof IntegrationException ? integrationException.getError() + ": " + integrationException.getMessage() : ExceptionTranslator.composeErrorMessage(InternalErrorEnum.E106, integrationException));
        exchange.setProperty("errorCallbackResponse", callbackResponse);
    }
}
