package org.eclipse.dirigible.components.api.kafka;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.eclipse.dirigible.commons.api.helpers.GsonHelper;
import org.eclipse.dirigible.components.engine.javascript.service.JavascriptService;
import org.eclipse.dirigible.repository.api.RepositoryPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/dirigible/components/api/kafka/KafkaConsumerRunner.class */
public class KafkaConsumerRunner implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunner.class);
    private static final String DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_MESSAGE = "messaging/wrappers/onMessage";
    private static final String DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_ERROR = "messaging/wrappers/onError";
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final Consumer consumer;
    private final String name;
    private final String handler;
    private int timeout;

    @Autowired
    private JavascriptService javascriptService;

    public KafkaConsumerRunner(Consumer consumer, String str, String str2, int i) {
        this.timeout = 1000;
        this.consumer = consumer;
        this.name = str;
        this.handler = str2;
        this.timeout = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Starting a Kafka listener for {} ...", this.name);
                }
                this.consumer.subscribe(Arrays.asList(this.name));
                while (!this.stopped.get()) {
                    Iterator it = this.consumer.poll(Duration.ofMillis(this.timeout)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (logger.isTraceEnabled()) {
                            logger.trace(MessageFormat.format("Start processing a received record in [{0}] by [{1}] ...", this.name, this.handler));
                        }
                        if (this.handler != null) {
                            Map<Object, Object> createMessagingContext = createMessagingContext();
                            createMessagingContext.put("message", escapeCodeString(GsonHelper.toJson(consumerRecord)));
                            try {
                                RepositoryPath repositoryPath = new RepositoryPath(DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_MESSAGE);
                                JavascriptService.get().handleRequest(repositoryPath.getSegments()[0], repositoryPath.constructPathFrom(1), (String) null, createMessagingContext, false);
                            } catch (Exception e) {
                                if (logger.isErrorEnabled()) {
                                    logger.error(e.getMessage(), e);
                                }
                                try {
                                    createMessagingContext.put("error", escapeCodeString(e.getMessage()));
                                    RepositoryPath repositoryPath2 = new RepositoryPath(DIRIGIBLE_MESSAGING_WRAPPER_MODULE_ON_ERROR);
                                    this.javascriptService.handleRequest(repositoryPath2.getSegments()[0], repositoryPath2.constructPathFrom(1), (String) null, createMessagingContext, false);
                                } catch (Exception e2) {
                                    if (logger.isErrorEnabled()) {
                                        logger.error(e2.getMessage(), e2);
                                    }
                                }
                            }
                        } else if (logger.isInfoEnabled()) {
                            logger.info(String.format("[Kafka Consumer] %s -  offset = %d, key = %s, value = %s%n", this.name, Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()));
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace(MessageFormat.format("Done processing the received record in [{0}] by [{1}]", this.name, this.handler));
                        }
                    }
                }
                this.consumer.close();
            } catch (Throwable th) {
                this.consumer.close();
                throw th;
            }
        } catch (WakeupException e3) {
            if (!this.stopped.get()) {
                throw e3;
            }
            this.consumer.close();
        }
    }

    public void stop() {
        this.stopped.set(true);
        this.consumer.wakeup();
    }

    private Map<Object, Object> createMessagingContext() {
        HashMap hashMap = new HashMap();
        hashMap.put("handler", this.handler);
        return hashMap;
    }

    private String escapeCodeString(String str) {
        return str.replace("'", "&amp;");
    }
}
