package org.eclipse.dirigible.core.messaging.service;

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.dirigible.commons.api.scripting.ScriptingException;
import org.eclipse.dirigible.core.messaging.api.MessagingException;
import org.eclipse.dirigible.engine.api.script.ScriptEngineExecutorsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/dirigible-core-messaging-3.5.1.jar:org/eclipse/dirigible/core/messaging/service/MessagingConsumer.class */
public class MessagingConsumer implements Runnable, ExceptionListener {
    private static final Logger logger = LoggerFactory.getLogger(MessagingConsumer.class);
    private static final String DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_MESSAGE = "messaging/wrappers/onMessage";
    private static final String DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_ERROR = "messaging/wrappers/onError";
    private String name;
    private char type;
    private String handler;
    private int timeout;
    private boolean stopped;

    public MessagingConsumer(String str, char c, String str2, int i) {
        this.timeout = 1000;
        this.name = str;
        this.type = c;
        this.handler = str2;
        this.timeout = i;
    }

    public MessagingConsumer(String str, char c, int i) {
        this.timeout = 1000;
        this.name = str;
        this.type = c;
        this.timeout = i;
    }

    public void stop() {
        this.stopped = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.handler != null) {
            while (!this.stopped && !Thread.currentThread().isInterrupted()) {
                receiveMessage();
            }
        }
    }

    public String receiveMessage() {
        Queue createTopic;
        try {
            logger.info("Starting a message listener for {} ...", this.name);
            Connection createConnection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
            createConnection.start();
            createConnection.setExceptionListener(this);
            Session createSession = createConnection.createSession(false, 1);
            if (this.type == 'Q') {
                createTopic = createSession.createQueue(this.name);
            } else {
                if (this.type != 'T') {
                    throw new MessagingException("Invalid Destination Type: " + this.type);
                }
                createTopic = createSession.createTopic(this.name);
            }
            MessageConsumer createConsumer = createSession.createConsumer(createTopic);
            try {
                if (this.handler == null) {
                    Message receive = createConsumer.receive(this.timeout);
                    logger.debug(MessageFormat.format("Received message in [{0}] by synchronous consumer.", this.name));
                    if (!(receive instanceof TextMessage)) {
                        return null;
                    }
                    String text = ((TextMessage) receive).getText();
                    createConsumer.close();
                    createSession.close();
                    createConnection.close();
                    return text;
                }
                while (!this.stopped) {
                    Message receive2 = createConsumer.receive(this.timeout);
                    if (receive2 != null) {
                        logger.trace(MessageFormat.format("Start processing a received message in [{0}] by [{1}] ...", this.name, this.handler));
                        if (!(receive2 instanceof TextMessage)) {
                            throw new MessagingException(MessageFormat.format("Invalid message [{0}] has been received in destination [{1}]", receive2, this.name));
                        }
                        Map<Object, Object> createMessagingContext = createMessagingContext();
                        createMessagingContext.put("message", escapeCodeString(((TextMessage) receive2).getText()));
                        ScriptEngineExecutorsManager.executeServiceModule("javascript", DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_MESSAGE, createMessagingContext);
                        logger.trace(MessageFormat.format("Done processing the received message in [{0}] by [{1}]", this.name, this.handler));
                    }
                }
                createConsumer.close();
                createSession.close();
                createConnection.close();
                return null;
            } finally {
                createConsumer.close();
                createSession.close();
                createConnection.close();
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), (Throwable) e);
            return null;
        }
    }

    @Override // javax.jms.ExceptionListener
    public synchronized void onException(JMSException jMSException) {
        try {
            Map<Object, Object> createMessagingContext = createMessagingContext();
            createMessagingContext.put("error", escapeCodeString(jMSException.getMessage()));
            ScriptEngineExecutorsManager.executeServiceModule("javascript", DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_ERROR, createMessagingContext);
        } catch (ScriptingException e) {
            logger.error(e.getMessage(), (Throwable) e);
        }
        logger.error(jMSException.getMessage(), (Throwable) jMSException);
    }

    private Map<Object, Object> createMessagingContext() {
        HashMap hashMap = new HashMap();
        hashMap.put("handler", this.handler);
        return hashMap;
    }

    private String escapeCodeString(String str) {
        return str.replace("'", "&amp;");
    }
}
