package com.github.jesse.l2cache.sync;

import com.github.jesse.l2cache.L2CacheConfig;
import com.github.jesse.l2cache.L2CacheConfigUtil;
import com.github.jesse.l2cache.consts.CacheConsts;
import com.github.jesse.l2cache.content.RedissonSupport;
import com.github.jesse.l2cache.util.pool.RunnableMdcWarpper;
import com.github.jesse.l2cache.util.pool.ThreadPoolSupport;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.RLock;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/jesse/l2cache/sync/RedisCacheSyncPolicy.class */
public class RedisCacheSyncPolicy extends AbstractCacheSyncPolicy {
    AtomicBoolean start = new AtomicBoolean(false);
    private RTopic topic;
    private static final Logger logger = LoggerFactory.getLogger(RedisCacheSyncPolicy.class);
    private static final ThreadPoolExecutor poolExecutor = ThreadPoolSupport.getPool("publish_redis_msg");

    @Override // com.github.jesse.l2cache.CacheSyncPolicy
    public void connnect() {
        if (!this.start.compareAndSet(false, true)) {
            logger.info("already started");
        } else {
            this.topic = getRedissonClient(getL2CacheConfig()).getTopic(getL2CacheConfig().getCacheSyncPolicy().getTopic());
            this.topic.addListener(CacheMessage.class, (charSequence, cacheMessage) -> {
                getCacheMessageListener().onMessage(cacheMessage);
            });
        }
    }

    @Override // com.github.jesse.l2cache.CacheSyncPolicy
    public void publish(CacheMessage cacheMessage) {
        poolExecutor.execute(new RunnableMdcWarpper(() -> {
            try {
                Long publishMsgPeriodMilliSeconds = L2CacheConfigUtil.getCacheConfig(getL2CacheConfig(), cacheMessage.getCacheName()).getCaffeine().getPublishMsgPeriodMilliSeconds();
                RLock lock = getRedissonClient(getL2CacheConfig()).getLock(buildLockKey(cacheMessage));
                if (!lock.tryLock(0L, publishMsgPeriodMilliSeconds.longValue(), TimeUnit.MILLISECONDS)) {
                    logger.warn("trylock fail, no need to publish message, publishMsgPeriod={}ms, message={}", publishMsgPeriodMilliSeconds, cacheMessage.toString());
                    return;
                }
                int holdCount = lock.getHoldCount();
                if (holdCount > 1) {
                    logger.warn("trylock succ, no need to publish message, publishMsgPeriod={}ms, lockHoldCount={}, message={}", new Object[]{publishMsgPeriodMilliSeconds, Integer.valueOf(holdCount), cacheMessage.toString()});
                } else {
                    logger.info("publish succ, cacheName={}, key={}, receivedMsgClientNum={}, message={}", new Object[]{cacheMessage.getCacheName(), cacheMessage.getKey(), Long.valueOf(this.topic.publish(cacheMessage)), cacheMessage.toString()});
                }
            } catch (Exception e) {
                logger.error("publish error, cacheName=" + cacheMessage.getCacheName() + ", key=" + cacheMessage.getKey(), e);
            }
        }, cacheMessage));
    }

    @Override // com.github.jesse.l2cache.CacheSyncPolicy
    public void disconnect() {
    }

    protected RedissonClient getRedissonClient(L2CacheConfig l2CacheConfig) {
        RedissonClient actualClient = getActualClient();
        if (null == actualClient) {
            logger.info("get or create RedissonClient instance by cache config");
            return RedissonSupport.getRedisson(l2CacheConfig);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("use setting RedissonClient instance");
        }
        return actualClient;
    }

    private String buildLockKey(CacheMessage cacheMessage) {
        return "lock" + CacheConsts.SPLIT + cacheMessage.getCacheName() + CacheConsts.SPLIT + cacheMessage.getKey() + CacheConsts.SPLIT + cacheMessage.getOptType();
    }
}
