package org.apache.pulsar.io.redis;

import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.redis.RedisAbstractConfig;
import org.apache.pulsar.io.redis.sink.RedisSinkConfig;

/* loaded from: input_file:org/apache/pulsar/io/redis/RedisSession.class */
public class RedisSession {
    private final AbstractRedisClient client;
    private final StatefulConnection connection;
    private final RedisClusterAsyncCommands<byte[], byte[]> asyncCommands;

    public RedisSession(AbstractRedisClient abstractRedisClient, StatefulConnection statefulConnection, RedisClusterAsyncCommands<byte[], byte[]> redisClusterAsyncCommands) {
        this.client = abstractRedisClient;
        this.connection = statefulConnection;
        this.asyncCommands = redisClusterAsyncCommands;
    }

    public AbstractRedisClient client() {
        return this.client;
    }

    public StatefulConnection connection() {
        return this.connection;
    }

    public RedisClusterAsyncCommands<byte[], byte[]> asyncCommands() {
        return this.asyncCommands;
    }

    public void close() throws Exception {
        if (null != this.connection) {
            this.connection.close();
        }
        if (null != this.client) {
            this.client.shutdown();
        }
    }

    public static RedisSession create(RedisSinkConfig redisSinkConfig) {
        RedisSession redisSession;
        ByteArrayCodec byteArrayCodec = new ByteArrayCodec();
        SocketOptions build = SocketOptions.builder().tcpNoDelay(redisSinkConfig.isTcpNoDelay()).connectTimeout(Duration.ofMillis(redisSinkConfig.getConnectTimeout())).keepAlive(redisSinkConfig.isKeepAlive()).build();
        try {
            RedisAbstractConfig.ClientMode valueOf = RedisAbstractConfig.ClientMode.valueOf(redisSinkConfig.getClientMode().toUpperCase());
            List<RedisURI> redisURIs = redisURIs(redisSinkConfig.getHostAndPorts(), redisSinkConfig);
            if (valueOf == RedisAbstractConfig.ClientMode.STANDALONE) {
                ClientOptions.Builder autoReconnect = ClientOptions.builder().socketOptions(build).requestQueueSize(redisSinkConfig.getRequestQueue()).autoReconnect(redisSinkConfig.isAutoReconnect());
                RedisClient create = RedisClient.create(redisURIs.get(0));
                create.setOptions(autoReconnect.build());
                StatefulRedisConnection connect = create.connect(byteArrayCodec);
                redisSession = new RedisSession(create, connect, connect.async());
            } else {
                if (valueOf != RedisAbstractConfig.ClientMode.CLUSTER) {
                    throw new UnsupportedOperationException(String.format("%s is not supported", redisSinkConfig.getClientMode()));
                }
                ClusterClientOptions.Builder autoReconnect2 = ClusterClientOptions.builder().requestQueueSize(redisSinkConfig.getRequestQueue()).autoReconnect(redisSinkConfig.isAutoReconnect());
                RedisClusterClient create2 = RedisClusterClient.create(redisURIs);
                create2.setOptions(autoReconnect2.build());
                StatefulRedisClusterConnection connect2 = create2.connect(byteArrayCodec);
                redisSession = new RedisSession(create2, connect2, connect2.async());
            }
            return redisSession;
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Illegal Redis client mode, valid values are: " + String.valueOf(Arrays.asList(RedisAbstractConfig.ClientMode.values())));
        }
    }

    private static List<RedisURI> redisURIs(List<HostAndPort> list, RedisSinkConfig redisSinkConfig) {
        ArrayList newArrayList = Lists.newArrayList();
        for (HostAndPort hostAndPort : list) {
            RedisURI.Builder builder = RedisURI.builder();
            builder.withHost(hostAndPort.getHost());
            builder.withPort(hostAndPort.getPort());
            builder.withDatabase(redisSinkConfig.getRedisDatabase());
            if (!StringUtils.isBlank(redisSinkConfig.getRedisPassword())) {
                builder.withPassword(redisSinkConfig.getRedisPassword());
            }
            newArrayList.add(builder.build());
        }
        return newArrayList;
    }
}
