package org.eclipse.dirigible.components.api.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.dirigible.commons.api.helpers.GsonHelper;
import org.eclipse.dirigible.commons.config.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/eclipse/dirigible/components/api/kafka/KafkaFacade.class */
public class KafkaFacade {
    private static final String DIRIGIBLE_KAFKA_BOOTSTRAP_SERVER = "DIRIGIBLE_KAFKA_BOOTSTRAP_SERVER";
    private static final String DIRIGIBLE_KAFKA_ACKS = "DIRIGIBLE_KAFKA_ACKS";
    private static final String DIRIGIBLE_KAFKA_KEY_SERIALIZER = "DIRIGIBLE_KAFKA_KEY_SERIALIZER";
    private static final String DIRIGIBLE_KAFKA_VALUE_SERIALIZER = "DIRIGIBLE_KAFKA_VALUE_SERIALIZER";
    private static final String DIRIGIBLE_KAFKA_AUTOCOMMIT_ENABLED = "DIRIGIBLE_KAFKA_AUTOCOMMIT_ENABLED";
    private static final String DIRIGIBLE_KAFKA_AUTOCOMMIT_INTERVAL = "DIRIGIBLE_KAFKA_AUTOCOMMIT_INTERVAL";
    private static final String DEFAULT_BOOTSTRAP_SERVER = "localhost:9092";
    private static final String DIRIGIBLE_KAFKA_ACKS_ALL = "all";
    private static final String DIRIGIBLE_KAFKA_SERIALIZER_STRING = "org.apache.kafka.common.serialization.StringSerializer";
    private static final String DIRIGIBLE_KAFKA_AUTOCOMMIT_ENABLED_DEFAULT = "true";
    private static final String DIRIGIBLE_KAFKA_AUTOCOMMIT_INTERVAL_DEFAULT = "1000";
    private static final Logger logger = LoggerFactory.getLogger(KafkaFacade.class);
    private static Map<String, Producer<String, String>> PRODUCERS = Collections.synchronizedMap(new HashMap());
    private static Map<String, KafkaConsumerRunner> CONSUMERS = Collections.synchronizedMap(new HashMap());

    public static final void send(String str, String str2, String str3, String str4) {
        if (str4 == null) {
            str4 = "{}";
        }
        Map map = (Map) GsonHelper.fromJson(str4, Map.class);
        String str5 = Configuration.get(DIRIGIBLE_KAFKA_BOOTSTRAP_SERVER, DEFAULT_BOOTSTRAP_SERVER);
        String obj = map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : str5;
        Producer<String, String> producer = obj != null ? PRODUCERS.get(obj) : null;
        if (producer == null) {
            Properties properties = new Properties();
            for (Object obj2 : map.keySet()) {
                properties.put(obj2, map.get(obj2));
            }
            if (properties.get("bootstrap.servers") == null) {
                properties.put("bootstrap.servers", str5);
            }
            if (properties.get("acks") == null) {
                properties.put("acks", Configuration.get(DIRIGIBLE_KAFKA_ACKS, DIRIGIBLE_KAFKA_ACKS_ALL));
            }
            if (properties.get("key.serializer") == null) {
                properties.put("key.serializer", Configuration.get(DIRIGIBLE_KAFKA_KEY_SERIALIZER, DIRIGIBLE_KAFKA_SERIALIZER_STRING));
            }
            if (properties.get("value.serializer") == null) {
                properties.put("value.serializer", Configuration.get(DIRIGIBLE_KAFKA_VALUE_SERIALIZER, DIRIGIBLE_KAFKA_SERIALIZER_STRING));
            }
            producer = new KafkaProducer<>(properties);
            PRODUCERS.put(obj, producer);
            if (logger.isInfoEnabled()) {
                logger.info("Kafka Producer [{}] created.", obj);
            }
        }
        producer.send(new ProducerRecord(str, str2, str3));
    }

    public static final void closeProducer(String str) {
        if (str == null) {
            str = "{}";
        }
        Map map = (Map) GsonHelper.fromJson(str, Map.class);
        Producer<String, String> producer = null;
        String obj = map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : Configuration.get(DIRIGIBLE_KAFKA_BOOTSTRAP_SERVER, DEFAULT_BOOTSTRAP_SERVER);
        if (obj != null) {
            producer = PRODUCERS.get(obj);
        }
        if (producer != null) {
            producer.close();
            PRODUCERS.remove(obj);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Kafka Producer [{}] has not been started yet.", obj);
        }
    }

    public static final void startListening(String str, String str2, int i, String str3) {
        if (str3 == null) {
            str3 = "{}";
        }
        Map map = (Map) GsonHelper.fromJson(str3, Map.class);
        String str4 = Configuration.get(DIRIGIBLE_KAFKA_BOOTSTRAP_SERVER, DEFAULT_BOOTSTRAP_SERVER);
        String createLocation = createLocation(str, map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : str4);
        if (CONSUMERS.get(createLocation) != null) {
            if (logger.isWarnEnabled()) {
                logger.warn("Kafka Consumer [{}] has already been started.", createLocation);
                return;
            }
            return;
        }
        Properties properties = new Properties();
        for (Object obj : map.keySet()) {
            properties.put(obj, map.get(obj));
        }
        if (properties.get("bootstrap.servers") == null) {
            properties.put("bootstrap.servers", str4);
        }
        if (properties.get("group.id") == null) {
            properties.put("group.id", str2 != null ? str2 : str);
        }
        if (properties.get("enable.auto.commit") == null) {
            properties.put("enable.auto.commit", Configuration.get(DIRIGIBLE_KAFKA_AUTOCOMMIT_ENABLED, DIRIGIBLE_KAFKA_AUTOCOMMIT_ENABLED_DEFAULT));
        }
        if (properties.get("auto.commit.interval.ms") == null) {
            properties.put("auto.commit.interval.ms", Configuration.get(DIRIGIBLE_KAFKA_AUTOCOMMIT_INTERVAL, DIRIGIBLE_KAFKA_AUTOCOMMIT_INTERVAL_DEFAULT));
        }
        if (properties.get("key.deserializer") == null) {
            properties.put("key.deserializer", Configuration.get(DIRIGIBLE_KAFKA_KEY_SERIALIZER, DIRIGIBLE_KAFKA_SERIALIZER_STRING));
        }
        if (properties.get("value.deserializer") == null) {
            properties.put("value.deserializer", Configuration.get(DIRIGIBLE_KAFKA_VALUE_SERIALIZER, DIRIGIBLE_KAFKA_SERIALIZER_STRING));
        }
        KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(new KafkaConsumer(properties), str, str2, i);
        Thread thread = new Thread(kafkaConsumerRunner);
        thread.setDaemon(false);
        thread.start();
        CONSUMERS.put(createLocation, kafkaConsumerRunner);
        if (logger.isInfoEnabled()) {
            logger.info("Kafka Consumer [{}] created.", createLocation);
        }
    }

    public static final void stopListening(String str, String str2) {
        if (str2 == null) {
            str2 = "{}";
        }
        Map map = (Map) GsonHelper.fromJson(str2, Map.class);
        String createLocation = createLocation(str, map.get("bootstrap.servers") != null ? map.get("bootstrap.servers").toString() : Configuration.get(DIRIGIBLE_KAFKA_BOOTSTRAP_SERVER, DEFAULT_BOOTSTRAP_SERVER));
        KafkaConsumerRunner kafkaConsumerRunner = CONSUMERS.get(createLocation);
        if (kafkaConsumerRunner != null) {
            kafkaConsumerRunner.stop();
            CONSUMERS.remove(createLocation);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Kafka Consumer [" + createLocation + "] has not been started yet.");
        }
    }

    private static String createLocation(String str, String str2) {
        return "[" + str2 + "]:[" + str + "]";
    }
}
