package streaming.common.zk;

import net.csdn.ServiceFramwork;
import net.csdn.common.logging.CSLogger;
import net.csdn.common.logging.Loggers;
import net.csdn.common.network.NetworkUtils;
import net.csdn.common.settings.ImmutableSettings;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import scala.Predef$;
import scala.Predef$any2stringadd$;
import tech.mlsql.common.utils.shell.command.ParamsUtil;

/* compiled from: ZkRegister.scala */
/* loaded from: input_file:streaming/common/zk/ZkRegister$.class */
public final class ZkRegister$ {
    public static ZkRegister$ MODULE$;
    private final CSLogger logger;

    static {
        new ZkRegister$();
    }

    public CSLogger logger() {
        return this.logger;
    }

    public ZKClient registerToZk(ParamsUtil paramsUtil) {
        ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
        builder.put(Predef$any2stringadd$.MODULE$.$plus$extension(Predef$.MODULE$.any2stringadd(ServiceFramwork.mode), ".zk.conf_root_dir"), paramsUtil.getParam("streaming.zk.conf_root_dir"));
        builder.put(Predef$any2stringadd$.MODULE$.$plus$extension(Predef$.MODULE$.any2stringadd(ServiceFramwork.mode), ".zk.servers"), paramsUtil.getParam("streaming.zk.servers"));
        ZKClient zKClient = new ZKClient(builder.build());
        final ZkClient zkClient = zKClient.zkConfUtil().client;
        if (!zkClient.exists(ZKConfUtil.CONF_ROOT_DIR)) {
            zkClient.createPersistent(ZKConfUtil.CONF_ROOT_DIR, true);
        }
        if (zkClient.exists(new StringBuilder(8).append(ZKConfUtil.CONF_ROOT_DIR).append("/address").toString())) {
            zkClient.delete(new StringBuilder(8).append(ZKConfUtil.CONF_ROOT_DIR).append("/address").toString());
            logger().error(new StringBuilder(27).append(ZKConfUtil.CONF_ROOT_DIR).append(" already exits in zookeeper").toString(), new Object[0]);
        }
        final String hostAddress = NetworkUtils.getFirstNonLoopbackAddress(NetworkUtils.StackType.IPv4).getHostAddress();
        final String param = paramsUtil.getParam("streaming.driver.port", "9003");
        logger().info(new StringBuilder(51).append("register ip and port to zookeeper:\n").append("zk=[").append(paramsUtil.getParam("streaming.zk.servers")).append("]\n").append(ZKConfUtil.CONF_ROOT_DIR).append("/address=").append(hostAddress).append(":").append(param).toString(), new Object[0]);
        final String sb = new StringBuilder(8).append(ZKConfUtil.CONF_ROOT_DIR).append("/address").toString();
        zkClient.createEphemeral(sb, new StringBuilder(1).append(hostAddress).append(":").append(param).toString());
        zkClient.subscribeDataChanges(sb, new IZkDataListener(sb, zkClient, hostAddress, param) { // from class: streaming.common.zk.ZkRegister$$anon$1
            private final String address$1;
            private final ZkClient client$1;
            private final String hostAddress$1;
            private final String port$1;

            public void handleDataChange(String str, Object obj) {
            }

            public void handleDataDeleted(String str) {
                ZkRegister$.MODULE$.logger().error(new StringBuilder(36).append(this.address$1).append("=").append(str).append(" removed by zookeeper, create again").toString(), new Object[0]);
                this.client$1.createEphemeral(this.address$1, new StringBuilder(1).append(this.hostAddress$1).append(":").append(this.port$1).toString());
            }

            {
                this.address$1 = sb;
                this.client$1 = zkClient;
                this.hostAddress$1 = hostAddress;
                this.port$1 = param;
            }
        });
        return zKClient;
    }

    private ZkRegister$() {
        MODULE$ = this;
        this.logger = Loggers.getLogger(ZkRegister.class);
    }
}
