package com.github.jesse.l2cache.sync;

import cn.hutool.core.util.StrUtil;
import com.github.jesse.l2cache.CacheConfig;
import com.github.jesse.l2cache.util.ObjectMapperUtil;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/jesse/l2cache/sync/KafkaCacheSyncPolicy.class */
public class KafkaCacheSyncPolicy extends AbstractCacheSyncPolicy {
    private static final Logger logger = LoggerFactory.getLogger(KafkaCacheSyncPolicy.class);
    AtomicBoolean start = new AtomicBoolean(false);
    private KafkaProducer<String, String> producer;
    private KafkaConsumer<String, String> consumer;

    @Override // com.github.jesse.l2cache.CacheSyncPolicy
    public void connnect() {
        if (!this.start.compareAndSet(false, true)) {
            logger.info("already started");
            return;
        }
        CacheConfig.CacheSyncPolicy cacheSyncPolicy = getCacheConfig().getCacheSyncPolicy();
        genConsumerGroupName(cacheSyncPolicy);
        this.producer = new KafkaProducer<>(cacheSyncPolicy.getProps());
        this.consumer = new KafkaConsumer<>(cacheSyncPolicy.getProps());
        new Thread(() -> {
            this.consumer.subscribe(Collections.singletonList(cacheSyncPolicy.getTopic()));
            while (true) {
                try {
                    ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(3L));
                    logger.debug("poll messages, topic={}, records={}", cacheSyncPolicy.getTopic(), Integer.valueOf(poll.count()));
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        logger.debug("received a message, topic={}, record={}", cacheSyncPolicy.getTopic(), consumerRecord.toString());
                        getCacheMessageListener().onMessage((CacheMessage) ObjectMapperUtil.toObject((String) consumerRecord.value(), CacheMessage.class));
                    }
                    this.consumer.commitSync();
                } catch (Exception e) {
                    logger.error("poll message deal error", e);
                }
            }
        }).start();
    }

    @Override // com.github.jesse.l2cache.CacheSyncPolicy
    public void publish(CacheMessage cacheMessage) {
        CacheConfig.CacheSyncPolicy cacheSyncPolicy = getCacheConfig().getCacheSyncPolicy();
        try {
            String json = ObjectMapperUtil.toJson(cacheMessage);
            logger.info("publish cache sync message, message={}", json);
            if (cacheSyncPolicy.isAsync()) {
                this.producer.send(new ProducerRecord(cacheSyncPolicy.getTopic(), (Object) null, json), (recordMetadata, exc) -> {
                    if (recordMetadata != null) {
                        logger.debug("sent to partition({}), offset({}), message({}) ", new Object[]{Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), json});
                    } else {
                        logger.error("async publish cache message error", exc);
                    }
                });
                logger.debug("async publish cache message");
            } else {
                logger.info("publish topic={}, RecordMetadata={}", cacheSyncPolicy.getTopic(), ((RecordMetadata) this.producer.send(new ProducerRecord(cacheSyncPolicy.getTopic(), (Object) null, json)).get()).toString());
            }
        } catch (Exception e) {
            logger.error("publish cache sync message error", e);
        }
    }

    @Override // com.github.jesse.l2cache.CacheSyncPolicy
    public void disconnect() {
    }

    private void genConsumerGroupName(CacheConfig.CacheSyncPolicy cacheSyncPolicy) {
        cacheSyncPolicy.getProps().put("group.id", cacheSyncPolicy.getProps().getProperty("group.id", "l2cacheGroup") + genRandomSrc());
    }

    private int genRandomSrc() {
        return (int) ((new Random(r0).nextInt(10000) * 1000) + (System.currentTimeMillis() % 1000));
    }

    private Properties buildProducerProps(Properties properties) {
        Properties properties2 = new Properties();
        setProp(properties, properties2, "bootstrap.servers");
        setProp(properties, properties2, "client.id");
        setProp(properties, properties2, "key.serializer");
        setProp(properties, properties2, "value.serializer");
        setProp(properties, properties2, "acks");
        return properties2;
    }

    private Properties buildConsumerProps(Properties properties) {
        Properties properties2 = new Properties();
        setProp(properties, properties2, "bootstrap.servers");
        setProp(properties, properties2, "group.id");
        setProp(properties, properties2, "enable.auto.commit");
        setProp(properties, properties2, "auto.commit.interval.ms");
        setProp(properties, properties2, "session.timeout.ms");
        setProp(properties, properties2, "key.deserializer");
        setProp(properties, properties2, "value.deserializer");
        setProp(properties, properties2, "auto.offset.reset");
        setProp(properties, properties2, "max.poll.records");
        setProp(properties, properties2, "max.poll.interval.ms");
        return properties2;
    }

    private void setProp(Properties properties, Properties properties2, String str) {
        setProp(properties, properties2, str, null);
    }

    private void setProp(Properties properties, Properties properties2, String str, String str2) {
        String property = properties.getProperty(str, str2);
        if (StrUtil.isBlank(property)) {
            return;
        }
        properties2.put(str, property);
    }
}
