package org.openhubframework.openhub.core.common.asynch.msg;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Body;
import org.apache.camel.Handler;
import org.apache.camel.Header;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.model.ModelCamelContext;
import org.openhubframework.openhub.api.asynch.msg.ChildMessage;
import org.openhubframework.openhub.api.asynch.msg.MessageSplitterCallback;
import org.openhubframework.openhub.api.asynch.msg.MsgSplitter;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.entity.MsgStateEnum;
import org.openhubframework.openhub.api.exception.LockFailureException;
import org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute;
import org.openhubframework.openhub.spi.msg.MessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:org/openhubframework/openhub/core/common/asynch/msg/MessageSplitterImpl.class */
public final class MessageSplitterImpl implements MsgSplitter {
    private static final Logger LOG = LoggerFactory.getLogger(MessageSplitterImpl.class);
    private final ModelCamelContext camelCtx;
    private final MessageService messageService;
    private final ExecutorService executor;
    private final MessageSplitterCallback splitterCallback;

    public MessageSplitterImpl(MessageService messageService, ModelCamelContext modelCamelContext, MessageSplitterCallback messageSplitterCallback) {
        Assert.notNull(messageService, "the messageService must not be null");
        Assert.notNull(modelCamelContext, "the camelCtx must not be null");
        Assert.notNull(messageSplitterCallback, "the splitterCallback must not be null");
        this.camelCtx = modelCamelContext;
        this.messageService = messageService;
        this.splitterCallback = messageSplitterCallback;
        this.executor = modelCamelContext.getExecutorServiceManager().newThreadPool(this, "MessageSplitter", 1, 3);
    }

    @Handler
    public final void splitMessage(@Header("processingMessage") Message message, @Body Object obj) {
        Assert.notNull(message, "the parentMsg must not be null");
        Assert.isTrue(message.getState() == MsgStateEnum.PROCESSING && message.getFailedCount() == 0, "only new message can be split to child messages");
        List childMessages = this.splitterCallback.getChildMessages(message, obj);
        LOG.debug("Count of child messages: " + childMessages.size());
        final ArrayList arrayList = new ArrayList(childMessages.size());
        for (int i = 0; i < childMessages.size(); i++) {
            Message createMessage = ChildMessage.createMessage((ChildMessage) childMessages.get(i));
            createMessage.setCorrelationId(String.valueOf(message.getCorrelationId()) + "_" + i);
            arrayList.add(createMessage);
        }
        message.setParentMessage(true);
        this.messageService.setStateWaiting(message);
        this.messageService.insertMessages(arrayList);
        final ProducerTemplate createProducerTemplate = this.camelCtx.createProducerTemplate();
        try {
            this.executor.submit(new Runnable() { // from class: org.openhubframework.openhub.core.common.asynch.msg.MessageSplitterImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    for (Message message2 : arrayList) {
                        MessageSplitterImpl.LOG.debug("Message " + message2.toHumanString() + " will be processed ...");
                        if (!MessageSplitterImpl.this.messageService.setStateInQueueForLock(message2)) {
                            throw new LockFailureException("Failed to lock message for change state to '" + MsgStateEnum.IN_QUEUE + "': " + message2.toHumanString());
                        }
                        createProducerTemplate.requestBody(AsynchMessageRoute.URI_SYNC_MSG, message2);
                        MessageSplitterImpl.LOG.debug("Message " + message2.toHumanString() + " was successfully processed.");
                    }
                }
            });
            if (createProducerTemplate != null) {
                try {
                    createProducerTemplate.stop();
                } catch (Exception e) {
                    LOG.error("error occurred during stopping producerTemplate", e);
                }
            }
        } catch (Throwable th) {
            if (createProducerTemplate != null) {
                try {
                    createProducerTemplate.stop();
                } catch (Exception e2) {
                    LOG.error("error occurred during stopping producerTemplate", e2);
                }
            }
            throw th;
        }
    }
}
