package org.openhubframework.openhub.core.common.asynch;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.camel.Body;
import org.apache.camel.Exchange;
import org.apache.camel.Handler;
import org.apache.camel.Header;
import org.apache.camel.Headers;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.model.ChoiceDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.openhubframework.openhub.api.common.EmailService;
import org.openhubframework.openhub.api.configuration.ConfigurableValue;
import org.openhubframework.openhub.api.configuration.ConfigurationItem;
import org.openhubframework.openhub.api.entity.ExternalCall;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.entity.MsgStateEnum;
import org.openhubframework.openhub.api.exception.LockFailureException;
import org.openhubframework.openhub.api.exception.StoppingException;
import org.openhubframework.openhub.api.route.AbstractBasicRoute;
import org.openhubframework.openhub.api.route.CamelConfiguration;
import org.openhubframework.openhub.core.common.asynch.confirm.ConfirmationService;
import org.openhubframework.openhub.core.common.event.AsynchEventHelper;
import org.openhubframework.openhub.spi.msg.MessageService;
import org.openhubframework.openhub.spi.node.NodeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

@CamelConfiguration(AsynchMessageRoute.ROUTE_BEAN)
/* loaded from: input_file:org/openhubframework/openhub/core/common/asynch/AsynchMessageRoute.class */
public class AsynchMessageRoute extends AbstractBasicRoute {
    private static final Logger LOG = LoggerFactory.getLogger(AsynchMessageRoute.class);
    public static final String ROUTE_BEAN = "msgRouteBean";
    public static final String ROUTE_ID_SYNC = "syncProcessOut_route";
    public static final String ROUTE_ID_ASYNC = "asyncProcess_route";
    private static final String ROUTE_ID_ASYNC_OUT = "asyncProcessOut_route";
    public static final String ROUTE_ID_ASYNCH_ERROR_HANDLING = "asynchProcessOutErrHandling_route";
    public static final String ROUTE_ID_ERROR_FATAL = "asynchProcessOutErrFatal_route";
    public static final String ROUTE_ID_POST_PROCESS_AFTER_FAILED = "postProcessAfterFatal_route";
    public static final String ROUTE_ID_POST_PROCESS_AFTER_OK = "postProcessAfterOK_route";
    public static final String ROUTE_ID_CONFIRM_MESSAGE = "asynchConfirm_route";
    public static final String URI_ASYNC_PROCESSING_MSG = "seda:asynch_message_route?concurrentConsumers={{ohf.asynch.concurrentConsumers}}&waitForTaskToComplete=Never&blockWhenFull=true&queueFactory=#priorityQueueFactory";
    public static final String URI_SYNC_MSG = "direct:sync_message_route";

    @ConfigurableValue(key = "ohf.asynch.countPartlyFailsBeforeFailed")
    private ConfigurationItem<Integer> countPartlyFailsBeforeFailed;

    protected void doConfigure() throws Exception {
        getContext().setHandleFault(true);
        getContext().setErrorHandlerBuilder(loggingErrorHandler("org.openhubframework.openhub"));
        onException(Exception.class).handled(true).log(LoggingLevel.DEBUG, "Asynch. routes - unspecified exception caught: ${property.CamelExceptionCaught.message}.").setProperty("externalCallSuccess", constant(false)).to("direct:errorAsyncHandling");
        from("direct:asyncProcessingIn").routeId(ROUTE_ID_ASYNC).bean(ROUTE_BEAN, "changeStateMessageToInQueue").to(URI_ASYNC_PROCESSING_MSG);
        from(URI_ASYNC_PROCESSING_MSG).routeId(ROUTE_ID_ASYNC_OUT).bean(ROUTE_BEAN, "setLogContextParams").to(URI_SYNC_MSG);
        ((ChoiceDefinition) ((ChoiceDefinition) from(URI_SYNC_MSG).routeId(ROUTE_ID_SYNC).setHeader("asynchMsgProcessing", constant(Boolean.TRUE)).validate(body().isInstanceOf(Message.class)).validate(simple("${body.state} == ${type:org.openhubframework.openhub.api.entity.MsgStateEnum.IN_QUEUE}")).bean(this, "logStartProcessing").bean(ROUTE_BEAN, "isAbleToHandleExistingMessage").choice().when().method(ROUTE_BEAN, "isMessageObsolete")).log(LoggingLevel.WARN, "Message ${body.toHumanString} was obsolete, stopped further processing.").stop().end().bean(this, "setEntityInfo").setHeader("processingMessage", body()).transform(simple("${body.payload}")).removeHeader(TraceHeaderProcessor.TRACE_HEADER).removeHeader("CamelSpringWebserviceSoapHeader").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute.1
            public void process(Exchange exchange) throws Exception {
                AsynchEventHelper.notifyMsgProcessing(exchange);
            }
        }).routingSlip(method(ROUTE_BEAN, "nextRoute")).choice().when(header("noEffectProcess").isEqualTo(Boolean.TRUE)).bean("messageService", "setStatePartlyFailedWithoutError").when().method(ROUTE_BEAN, "checkParentMessage")).bean("messageService", "setStateWaiting").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute.2
            public void process(Exchange exchange) throws Exception {
                if (((Message) exchange.getIn().getHeader("processingMessage")).getState() == MsgStateEnum.WAITING) {
                    AsynchEventHelper.notifyMsgWaiting(exchange);
                }
            }
        }).otherwise().bean("messageService", "setStateOk").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute.3
            public void process(Exchange exchange) throws Exception {
                AsynchEventHelper.notifyMsgCompleted(exchange);
            }
        }).to("direct:postProcessAfterOK").end().end();
        ((ChoiceDefinition) from("direct:errorAsyncHandling").routeId(ROUTE_ID_ASYNCH_ERROR_HANDLING).onException(Exception.class).handled(true).log(LoggingLevel.ERROR, "Error while handling error: ${exception.stacktrace}").end().validate(exchangeProperty("CamelExceptionCaught").isNotNull()).log(LoggingLevel.WARN, "Error occurred during route processing: ${property.CamelExceptionCaught}").to("log:" + AsynchMessageRoute.class.getPackage().getName() + "?level=WARN&showCaughtException=true&showStackTrace=true&multiline=true").choice().when().method(ROUTE_BEAN, "checkMessageFailed")).to("direct:errorAsyncFatal").otherwise().bean("messageService", "setStatePartlyFailed").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute.4
            public void process(Exchange exchange) throws Exception {
                AsynchEventHelper.notifyMsgPartlyFailed(exchange);
            }
        }).end().stop();
        from("direct:errorAsyncFatal").routeId(ROUTE_ID_ERROR_FATAL).onException(Exception.class).handled(true).log(LoggingLevel.ERROR, "Error while handling error: ${exception.stacktrace}").end().bean("messageService", "setStateFailed").process(new Processor() { // from class: org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute.5
            public void process(Exchange exchange) throws Exception {
                AsynchEventHelper.notifyMsgFailed(exchange);
            }
        }).to("direct:postProcessAfterFatal").stop();
        from("direct:postProcessAfterOK").routeId(ROUTE_ID_POST_PROCESS_AFTER_OK).validate(header("processingMessage").isNotNull()).to("direct:asynch_message_confirm");
        from("direct:postProcessAfterFatal").routeId(ROUTE_ID_POST_PROCESS_AFTER_FAILED).validate(header("processingMessage").isNotNull()).to("direct:asynch_message_confirm").bean(ROUTE_BEAN, "sendMailToAdmin").id("sendEmail");
        ((ProcessorDefinition) from("direct:asynch_message_confirm").routeId(ROUTE_ID_CONFIRM_MESSAGE).onException(Exception.class).handled(true).log(LoggingLevel.ERROR, "exception during confirmation caught - '${property.CamelExceptionCaught.message}'").choice().when(body().isInstanceOf(ExternalCall.class)).bean(ConfirmationService.BEAN, "confirmationFailed").otherwise().bean(ConfirmationService.BEAN, "insertFailedConfirmation").end().end().choice().when(header("processingMessage").isNull()).validate(body().isInstanceOf(ExternalCall.class)).setHeader("processingMessage").mvel("request.body.message")).endChoice().end().bean("confirmationCallback", "confirm").filter(body().isInstanceOf(ExternalCall.class)).bean(ConfirmationService.BEAN, "confirmationComplete").end();
    }

    @Handler
    public void setLogContextParams(@Body Message message, @Header("REQUEST_ID") @Nullable String str) {
        LogContextHelper.setLogContextParams(message, str);
    }

    @Handler
    public boolean isMessageObsolete(@Body Message message) {
        Assert.notNull(message, "the msg must not be null");
        return !((MessageService) getBean(MessageService.class)).setStateProcessingForLock(message);
    }

    @Handler
    public void logStartProcessing(@Body Message message, @Header("insertMsgToQueue") @Nullable Long l) {
        LOG.debug("Starts processing of the message {}, waited in queue for {} ms", message.toHumanString(), l != null ? Long.valueOf(System.currentTimeMillis() - l.longValue()) : "-");
    }

    @Handler
    public void setEntityInfo(@Body Message message, @Headers Map<String, Object> map) {
        Assert.notNull(message, "the msg must not be null");
        if (message.getObjectId() != null) {
            map.put("asynchObjectId", message.getObjectId());
        }
        if (message.getEntityType() != null) {
            map.put("entityType", message.getEntityType());
        }
    }

    @Handler
    public boolean checkMessageFailed(@Header("processingMessage") Message message, @Nullable Exception exc) {
        Assert.notNull(message, "the msg must not be null");
        boolean z = message.getFailedCount() >= ((Integer) this.countPartlyFailsBeforeFailed.getValue()).intValue();
        if (!z && exc != null && ExceptionUtils.indexOfThrowable(exc, StoppingException.class) >= 0) {
            LOG.warn("ESB not processing existing message. Message " + exc.getMessage() + " is changed to failed state.");
            z = true;
        }
        return z;
    }

    @Handler
    public boolean checkParentMessage(@Header("processingMessage") Message message) {
        Assert.notNull(message, "the msg must not be null");
        return message.isParentMessage();
    }

    @Handler
    public void sendMailToAdmin(@Header("processingMessage") Message message) {
        Assert.notNull(message, "the msg must not be null");
        ((EmailService) lookup("emailService", EmailService.class)).sendEmailToAdmins("Notification about FAILED message", "The following message " + message.toHumanString() + " FAILED.\n\nBody:\n" + message.getEnvelope());
    }

    @Handler
    public String nextRoute(@Header("processingMessage") Message message) {
        Assert.notNull(message, "the msg must not be null");
        return "direct:" + message.getService().getServiceName() + "_" + message.getOperationName() + "_out_route";
    }

    @Handler
    public void changeStateMessageToInQueue(@Body Message message) throws LockFailureException {
        Assert.notNull(message, "msg must not be null");
        LOG.debug("Change state of message '{}' into '{}.'", message.toHumanString(), MsgStateEnum.IN_QUEUE);
        if (!((MessageService) getBean(MessageService.class)).setStateInQueueForLock(message)) {
            throw new LockFailureException("Failed to lock message for change state to '" + MsgStateEnum.IN_QUEUE + "': " + message.toHumanString());
        }
    }

    @Handler
    public void isAbleToHandleExistingMessage() throws StoppingException {
        if (!((NodeService) getApplicationContext().getBean(NodeService.class)).getActualNode().isAbleToHandleExistingMessages()) {
            throw new StoppingException("ESB has been stopped...");
        }
    }
}
