package com.github.ddth.pubsub.impl;

import com.github.ddth.commons.redis.JedisConnector;
import com.github.ddth.commons.redis.JedisUtils;
import com.github.ddth.pubsub.ISubscriber;
import com.github.ddth.queue.IMessage;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis;

/* loaded from: input_file:com/github/ddth/pubsub/impl/RedisPubSubHub.class */
public class RedisPubSubHub<ID, DATA> extends BaseRedisPubSubHub<ID, DATA> {
    public static final String DEFAULT_HOST_AND_PORT = "localhost:6379";
    private Jedis myPubSubJedis;
    private final Logger LOGGER = LoggerFactory.getLogger(RedisPubSubHub.class);
    private String redisHostAndPort = "localhost:6379";
    private boolean ready = false;
    private LoadingCache<String, Set<ISubscriber<ID, DATA>>> subscriptions = CacheBuilder.newBuilder().build(new CacheLoader<String, Set<ISubscriber<ID, DATA>>>() { // from class: com.github.ddth.pubsub.impl.RedisPubSubHub.1
        public Set<ISubscriber<ID, DATA>> load(String str) {
            return new HashSet();
        }
    });
    private BinaryJedisPubSub myPubSubGateway = new BinaryJedisPubSub() { // from class: com.github.ddth.pubsub.impl.RedisPubSubHub.2
        public void onSubscribe(byte[] bArr, int i) {
            RedisPubSubHub.this.ready = true;
        }

        public void onUnsubscribe(byte[] bArr, int i) {
            RedisPubSubHub.this.ready = false;
        }

        public void onPSubscribe(byte[] bArr, int i) {
            RedisPubSubHub.this.ready = true;
        }

        public void onPUnsubscribe(byte[] bArr, int i) {
            RedisPubSubHub.this.ready = false;
        }

        private void handleMessage(byte[] bArr, byte[] bArr2) {
            String str = new String(bArr, StandardCharsets.UTF_8);
            try {
                Set set = (Set) RedisPubSubHub.this.subscriptions.get(str);
                if (set != null && set.size() > 0) {
                    IMessage<ID, DATA> deserialize = RedisPubSubHub.this.deserialize(bArr2);
                    synchronized (set) {
                        Iterator it = set.iterator();
                        while (it.hasNext()) {
                            try {
                                ((ISubscriber) it.next()).onMessage(str, deserialize);
                            } catch (Exception e) {
                                RedisPubSubHub.this.LOGGER.warn(e.getMessage(), e);
                            }
                        }
                    }
                }
            } catch (Exception e2) {
                RedisPubSubHub.this.LOGGER.error(e2.getMessage(), e2);
            }
        }

        public void onPMessage(byte[] bArr, byte[] bArr2, byte[] bArr3) {
            handleMessage(bArr2, bArr3);
        }

        public void onMessage(byte[] bArr, byte[] bArr2) {
            handleMessage(bArr, bArr2);
        }
    };

    public String getRedisHostAndPort() {
        return this.redisHostAndPort;
    }

    public RedisPubSubHub<ID, DATA> setRedisHostAndPort(String str) {
        this.redisHostAndPort = str;
        return this;
    }

    public boolean isReady() {
        return this.ready;
    }

    @Override // com.github.ddth.pubsub.impl.BaseRedisPubSubHub
    protected JedisConnector buildJedisConnector() {
        JedisConnector jedisConnector = new JedisConnector();
        jedisConnector.setJedisPoolConfig(JedisUtils.defaultJedisPoolConfig()).setRedisHostsAndPorts(getRedisHostAndPort()).setRedisPassword(getRedisPassword()).init();
        return jedisConnector;
    }

    @Override // com.github.ddth.pubsub.impl.BaseRedisPubSubHub, com.github.ddth.pubsub.impl.AbstractPubSubHub
    public RedisPubSubHub<ID, DATA> init() {
        super.init();
        new Thread(() -> {
            this.myPubSubJedis = getJedisConnector().getJedis();
            this.myPubSubJedis.psubscribe(this.myPubSubGateway, (byte[][]) new byte[]{"*".getBytes()});
        }).start();
        return this;
    }

    @Override // com.github.ddth.pubsub.impl.BaseRedisPubSubHub, com.github.ddth.pubsub.impl.AbstractPubSubHub
    public void destroy() {
        try {
            try {
                if (this.myPubSubGateway != null) {
                    this.myPubSubGateway.punsubscribe();
                    long currentTimeMillis = System.currentTimeMillis();
                    while (this.myPubSubGateway.isSubscribed() && System.currentTimeMillis() - currentTimeMillis < 1000) {
                        Thread.sleep(1L);
                    }
                }
            } catch (Exception e) {
                this.LOGGER.warn(e.getMessage(), e);
            }
            try {
                if (this.myPubSubJedis != null) {
                    this.myPubSubJedis.close();
                }
            } catch (Exception e2) {
                this.LOGGER.warn(e2.getMessage(), e2);
            }
        } finally {
            super.destroy();
        }
    }

    @Override // com.github.ddth.pubsub.IPubSubHub
    public boolean publish(String str, IMessage<ID, DATA> iMessage) {
        Jedis jedis = getJedisConnector().getJedis();
        try {
            boolean z = jedis.publish(str.getBytes(StandardCharsets.UTF_8), serialize(iMessage)) != null;
            if (jedis != null) {
                jedis.close();
            }
            return z;
        } catch (Throwable th) {
            if (jedis != null) {
                try {
                    jedis.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.github.ddth.pubsub.IPubSubHub
    public void subscribe(String str, ISubscriber<ID, DATA> iSubscriber) {
        try {
            Set set = (Set) this.subscriptions.get(str);
            synchronized (set) {
                set.add(iSubscriber);
            }
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.github.ddth.pubsub.IPubSubHub
    public void unsubscribe(String str, ISubscriber<ID, DATA> iSubscriber) {
        try {
            Set set = (Set) this.subscriptions.get(str);
            synchronized (set) {
                set.remove(iSubscriber);
            }
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
