package com.jeesuite.scheduler.registry;

import com.jeesuite.common.json.JsonUtils;
import com.jeesuite.scheduler.AbstractJob;
import com.jeesuite.scheduler.JobContext;
import com.jeesuite.scheduler.JobRegistry;
import com.jeesuite.scheduler.model.JobConfig;
import com.jeesuite.scheduler.monitor.MonitorCommond;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.zookeeper.data.Stat;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/jeesuite/scheduler/registry/ZkJobRegistry.class */
public class ZkJobRegistry implements JobRegistry, InitializingBean, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(ZkJobRegistry.class);
    public static final String ROOT = "/schedulers/";
    private String zkServers;
    private ZkClient zkClient;
    private String groupPath;
    private String nodeStateParentPath;
    private ScheduledExecutorService zkCheckTask;
    private volatile boolean updatingStatus;
    private Map<String, JobConfig> schedulerConfgs = new ConcurrentHashMap();
    private volatile boolean zkAvailabled = true;
    private boolean nodeEventSubscribed = false;

    public void setZkServers(String str) {
        this.zkServers = str;
    }

    public void afterPropertiesSet() throws Exception {
        this.zkClient = new ZkClient(new ZkConnection(this.zkServers), 10000);
        this.zkCheckTask = Executors.newScheduledThreadPool(1);
        this.zkCheckTask.scheduleAtFixedRate(new Runnable() { // from class: com.jeesuite.scheduler.registry.ZkJobRegistry.1
            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v37, types: [java.util.List] */
            @Override // java.lang.Runnable
            public void run() {
                ArrayList arrayList;
                if (ZkJobRegistry.this.schedulerConfgs.isEmpty()) {
                    return;
                }
                try {
                    arrayList = ZkJobRegistry.this.zkClient.getChildren(ZkJobRegistry.this.nodeStateParentPath);
                    ZkJobRegistry.this.zkAvailabled = true;
                } catch (Exception e) {
                    ZkJobRegistry.this.checkZkAvailabled();
                    arrayList = new ArrayList(JobContext.getContext().getActiveNodes());
                }
                if (!arrayList.contains(JobContext.getContext().getNodeId())) {
                    ZkJobRegistry.this.zkClient.createEphemeral(ZkJobRegistry.this.nodeStateParentPath + "/" + JobContext.getContext().getNodeId());
                    ZkJobRegistry.logger.info("node[{}] re-join task clusters", JobContext.getContext().getNodeId());
                }
                Collections.sort(arrayList);
                for (JobConfig jobConfig : ZkJobRegistry.this.schedulerConfgs.values()) {
                    if (!arrayList.contains(jobConfig.getCurrentNodeId())) {
                        String str = (String) arrayList.get(0);
                        jobConfig.setCurrentNodeId(str);
                        ZkJobRegistry.logger.warn("Job[{}-{}] currentNodeId[{}] not in activeNodeList, assign new ExecuteNodeId:{}", new Object[]{jobConfig.getGroupName(), jobConfig.getJobName(), jobConfig.getCurrentNodeId(), str});
                    }
                }
            }
        }, 60L, 30L, TimeUnit.SECONDS);
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public synchronized void register(JobConfig jobConfig) {
        boolean z = false;
        Calendar calendar = Calendar.getInstance();
        long timeInMillis = calendar.getTimeInMillis();
        jobConfig.setModifyTime(timeInMillis);
        if (this.groupPath == null) {
            this.groupPath = ROOT + jobConfig.getGroupName();
        }
        if (this.nodeStateParentPath == null) {
            this.nodeStateParentPath = this.groupPath + "/nodes";
        }
        String path = getPath(jobConfig);
        final String jobName = jobConfig.getJobName();
        if (!this.zkClient.exists(this.nodeStateParentPath)) {
            z = true;
            this.zkClient.createPersistent(this.nodeStateParentPath, true);
        } else if (0 == 0) {
            z = this.zkClient.getChildren(this.nodeStateParentPath).size() == 0;
        }
        if (!this.zkClient.exists(path)) {
            this.zkClient.createPersistent(path, true);
        }
        boolean z2 = z;
        if (!z2) {
            JobConfig configFromZK = getConfigFromZK(path, null);
            if (configFromZK == null) {
                z2 = true;
            } else if (!StringUtils.equals(configFromZK.getCronExpr(), jobConfig.getCronExpr())) {
                z2 = true;
            } else if (configFromZK.getNextFireTime() != null && configFromZK.getNextFireTime().before(calendar.getTime())) {
                z2 = true;
            } else if (timeInMillis - configFromZK.getModifyTime() > TimeUnit.MINUTES.toMillis(30L)) {
                z2 = true;
            }
            if (!z2) {
                jobConfig = configFromZK;
            }
        }
        if (z2) {
            jobConfig.setCurrentNodeId(JobContext.getContext().getNodeId());
            this.zkClient.writeData(path, JsonUtils.toJson(jobConfig));
        }
        this.schedulerConfgs.put(jobConfig.getJobName(), jobConfig);
        regAndSubscribeNodeEvent();
        this.zkClient.subscribeDataChanges(path, new IZkDataListener() { // from class: com.jeesuite.scheduler.registry.ZkJobRegistry.2
            public void handleDataDeleted(String str) throws Exception {
                ZkJobRegistry.this.schedulerConfgs.remove(jobName);
            }

            public void handleDataChange(String str, Object obj) throws Exception {
                if (obj == null) {
                    return;
                }
                ZkJobRegistry.this.schedulerConfgs.put(jobName, (JobConfig) JsonUtils.toObject(obj.toString(), JobConfig.class));
            }
        });
        if (z) {
            for (String str : this.zkClient.getChildren(this.groupPath)) {
                if (!"nodes".equals(str) && !jobConfig.getJobName().equals(str)) {
                    this.zkClient.delete(this.groupPath + "/" + str);
                    logger.info("delete history job path:{}/{}", this.groupPath, str);
                }
            }
        }
        logger.info("finish register schConfig:{}", ToStringBuilder.reflectionToString(jobConfig, ToStringStyle.MULTI_LINE_STYLE));
    }

    private synchronized void regAndSubscribeNodeEvent() {
        if (this.nodeEventSubscribed) {
            return;
        }
        this.zkClient.createEphemeral(this.nodeStateParentPath + "/" + JobContext.getContext().getNodeId());
        this.zkClient.subscribeChildChanges(this.nodeStateParentPath, new IZkChildListener() { // from class: com.jeesuite.scheduler.registry.ZkJobRegistry.3
            public void handleChildChange(String str, List<String> list) throws Exception {
                if (list == null || !list.contains(JobContext.getContext().getNodeId())) {
                    ZkJobRegistry.this.zkClient.createEphemeral(ZkJobRegistry.this.nodeStateParentPath + "/" + JobContext.getContext().getNodeId());
                    ZkJobRegistry.logger.info("Nodelist is empty~ node[{}] re-join task clusters", JobContext.getContext().getNodeId());
                } else {
                    ZkJobRegistry.logger.info(">>nodes changed ,nodes:{}", list);
                    ZkJobRegistry.this.rebalanceJobNode(list);
                    JobContext.getContext().refreshNodes(list);
                }
            }
        });
        logger.info("subscribe nodes change event at path:{}", this.nodeStateParentPath);
        String str = this.nodeStateParentPath + "/" + JobContext.getContext().getNodeId();
        this.zkClient.subscribeDataChanges(str, new IZkDataListener() { // from class: com.jeesuite.scheduler.registry.ZkJobRegistry.4
            public void handleDataDeleted(String str2) throws Exception {
            }

            public void handleDataChange(String str2, Object obj) throws Exception {
                MonitorCommond monitorCommond = (MonitorCommond) obj;
                ZkJobRegistry.logger.info("收到commond:" + monitorCommond.toString());
                ZkJobRegistry.this.execCommond(monitorCommond);
            }
        });
        logger.info("subscribe command event at path:{}", str);
        List<String> children = this.zkClient.getChildren(this.nodeStateParentPath);
        JobContext.getContext().refreshNodes(children);
        logger.info("current activeNodes:{}", children);
        this.nodeEventSubscribed = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void rebalanceJobNode(List<String> list) {
        do {
        } while (this.updatingStatus);
        int i = 0;
        for (JobConfig jobConfig : this.schedulerConfgs.values()) {
            int i2 = i;
            i++;
            String str = list.get(i2);
            if (!StringUtils.equals(jobConfig.getCurrentNodeId(), str)) {
                jobConfig.setCurrentNodeId(str);
                logger.info("rebalance Job[{}-{}] To Node[{}] ", new Object[]{jobConfig.getGroupName(), jobConfig.getJobName(), str});
            }
            if (i >= list.size()) {
                i = 0;
            }
            updateJobConfig(jobConfig);
        }
    }

    private synchronized JobConfig getConfigFromZK(String str, Stat stat) {
        Object readData = stat == null ? this.zkClient.readData(str) : this.zkClient.readData(str, stat);
        if (readData == null) {
            return null;
        }
        return (JobConfig) JsonUtils.toObject(readData.toString(), JobConfig.class);
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public synchronized JobConfig getConf(String str, boolean z) {
        JobConfig jobConfig = this.schedulerConfgs.get(str);
        if (this.schedulerConfgs.size() == 1) {
            jobConfig.setCurrentNodeId(JobContext.getContext().getNodeId());
            return jobConfig;
        }
        if (z) {
            try {
                jobConfig = getConfigFromZK(getPath(jobConfig), null);
            } catch (Exception e) {
                checkZkAvailabled();
                logger.warn("fecth JobConfig from Registry error", e);
            }
        }
        return jobConfig;
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public synchronized void unregister(String str) {
        String path = getPath(this.schedulerConfgs.get(str));
        if (this.zkClient.getChildren(this.nodeStateParentPath).size() == 1) {
            this.zkClient.delete(path);
            logger.info("all node is closed ,delete path:" + path);
        }
    }

    private String getPath(JobConfig jobConfig) {
        return ROOT + jobConfig.getGroupName() + "/" + jobConfig.getJobName();
    }

    public void destroy() throws Exception {
        this.zkCheckTask.shutdown();
        this.zkClient.close();
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public void setRuning(String str, Date date) {
        this.updatingStatus = false;
        try {
            JobConfig conf = getConf(str, false);
            conf.setRunning(true);
            conf.setLastFireTime(date);
            conf.setCurrentNodeId(JobContext.getContext().getNodeId());
            conf.setModifyTime(Calendar.getInstance().getTimeInMillis());
            conf.setErrorMsg(null);
            this.schedulerConfgs.put(str, conf);
            try {
                if (this.zkAvailabled) {
                    this.zkClient.writeData(getPath(conf), JsonUtils.toJson(conf));
                }
            } catch (Exception e) {
                checkZkAvailabled();
                logger.warn(String.format("Job[{}] setRuning error...", str), e);
            }
        } finally {
            this.updatingStatus = false;
        }
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public void setStoping(String str, Date date, Exception exc) {
        this.updatingStatus = false;
        try {
            JobConfig conf = getConf(str, false);
            conf.setRunning(false);
            conf.setNextFireTime(date);
            conf.setModifyTime(Calendar.getInstance().getTimeInMillis());
            conf.setErrorMsg(exc == null ? null : exc.getMessage());
            this.schedulerConfgs.put(str, conf);
            try {
                if (this.zkAvailabled) {
                    this.zkClient.writeData(getPath(conf), JsonUtils.toJson(conf));
                }
            } catch (Exception e) {
                checkZkAvailabled();
                logger.warn(String.format("Job[{}] setStoping error...", str), e);
            }
        } finally {
            this.updatingStatus = false;
        }
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public List<JobConfig> getAllJobs() {
        return new ArrayList(this.schedulerConfgs.values());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkZkAvailabled() {
        try {
            this.zkClient.exists(ROOT);
            this.zkAvailabled = true;
        } catch (Exception e) {
            this.zkAvailabled = false;
            logger.warn("ZK server is not available....");
        }
        return this.zkAvailabled;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void execCommond(MonitorCommond monitorCommond) {
        if (monitorCommond == null) {
            return;
        }
        JobConfig jobConfig = this.schedulerConfgs.get(monitorCommond.getJobName());
        String str = monitorCommond.getJobGroup() + ":" + monitorCommond.getJobName();
        final AbstractJob abstractJob = JobContext.getContext().getAllJobs().get(str);
        if (1 == monitorCommond.getCmdType()) {
            if (jobConfig.isRunning()) {
                logger.info("任务正在执行中，请稍后再执行");
                return;
            } else if (abstractJob != null) {
                new Thread(new Runnable() { // from class: com.jeesuite.scheduler.registry.ZkJobRegistry.5
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ZkJobRegistry.logger.info("begin execute job[{}] by MonitorCommond", abstractJob.getJobName());
                            abstractJob.doJob(JobContext.getContext());
                        } catch (Exception e) {
                            ZkJobRegistry.logger.error(abstractJob.getJobName(), e);
                        }
                    }
                }).start();
                return;
            } else {
                logger.warn("Not found job by key:{} !!!!", str);
                return;
            }
        }
        if ((2 == monitorCommond.getCmdType() || 3 == monitorCommond.getCmdType()) && jobConfig != null) {
            if (2 == monitorCommond.getCmdType()) {
                jobConfig.setActive("1".equals(monitorCommond.getBody()));
            } else {
                try {
                    new CronExpression(monitorCommond.getBody().toString());
                    abstractJob.resetTriggerCronExpr(monitorCommond.getBody().toString());
                    jobConfig.setCronExpr(monitorCommond.getBody().toString());
                } catch (Exception e) {
                    throw new RuntimeException("cron表达式格式错误");
                }
            }
            updateJobConfig(jobConfig);
            if (JobContext.getContext().getConfigPersistHandler() != null) {
                JobContext.getContext().getConfigPersistHandler().persist(jobConfig);
            }
        }
    }

    @Override // com.jeesuite.scheduler.JobRegistry
    public void updateJobConfig(JobConfig jobConfig) {
        jobConfig.setModifyTime(Calendar.getInstance().getTimeInMillis());
        this.zkClient.writeData(getPath(jobConfig), JsonUtils.toJson(jobConfig));
        this.schedulerConfgs.put(jobConfig.getJobName(), jobConfig);
    }
}
