package org.apache.iotdb.confignode.consensus.statemachine;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.persistence.schema.ConfignodeSnapshotParser;
import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.utils.writelog.LogWriter;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.class */
public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.EventApi {
    private final ConfigPlanExecutor executor;
    private ConfigManager configManager;
    private LogWriter simpleLogWriter;
    private File simpleLogFile;
    private int startIndex;
    private int endIndex;
    private final TEndPoint currentNodeTEndPoint = new TEndPoint().setIp(ConfigNodeDescriptor.getInstance().getConf().getInternalAddress()).setPort(ConfigNodeDescriptor.getInstance().getConf().getConsensusPort());
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class);
    private static final ExecutorService threadPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName());
    private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
    private static final String CURRENT_FILE_DIR = ConsensusManager.getConfigRegionDir() + File.separator + "current";
    private static final String PROGRESS_FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_inprogress_";
    private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_";
    private static final long LOG_FILE_MAX_SIZE = CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
    private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
    private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine$FileComparator.class */
    public static class FileComparator implements Comparator<String> {
        FileComparator() {
        }

        @Override // java.util.Comparator
        public int compare(String str, String str2) {
            return Long.compare(ConfigRegionStateMachine.parseEndIndex(str), ConfigRegionStateMachine.parseEndIndex(str2));
        }
    }

    public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor configPlanExecutor) {
        this.executor = configPlanExecutor;
        this.configManager = configManager;
    }

    public ConfigManager getConfigManager() {
        return this.configManager;
    }

    public void setConfigManager(ConfigManager configManager) {
        this.configManager = configManager;
    }

    public TSStatus write(IConsensusRequest iConsensusRequest) {
        return (TSStatus) Optional.ofNullable(iConsensusRequest).map(iConsensusRequest2 -> {
            return write((ConfigPhysicalPlan) iConsensusRequest);
        }).orElseGet(() -> {
            return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
        });
    }

    protected TSStatus write(ConfigPhysicalPlan configPhysicalPlan) {
        TSStatus tSStatus;
        try {
            tSStatus = this.executor.executeNonQueryPlan(configPhysicalPlan);
        } catch (UnknownPhysicalPlanTypeException e) {
            LOGGER.error(e.getMessage());
            tSStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
        }
        if ("org.apache.iotdb.consensus.simple.SimpleConsensus".equals(CONF.getConfigNodeConsensusProtocolClass())) {
            writeLogForSimpleConsensus(configPhysicalPlan);
        }
        if (tSStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            PipeConfigNodeAgent.runtime().listener().tryListenToPlan(configPhysicalPlan, false);
        }
        return tSStatus;
    }

    public IConsensusRequest deserializeRequest(IConsensusRequest iConsensusRequest) {
        IConsensusRequest create;
        if (iConsensusRequest instanceof ByteBufferConsensusRequest) {
            try {
                create = ConfigPhysicalPlan.Factory.create(iConsensusRequest.serializeToByteBuffer());
            } catch (Exception e) {
                LOGGER.error("Deserialization error for write plan, request: {}, bytebuffer: {}", new Object[]{iConsensusRequest, iConsensusRequest.serializeToByteBuffer(), e});
                return null;
            }
        } else {
            if (!(iConsensusRequest instanceof ConfigPhysicalPlan)) {
                LOGGER.error("Unexpected write plan, request: {}, bytebuffer: {}", iConsensusRequest, iConsensusRequest.serializeToByteBuffer());
                return null;
            }
            create = iConsensusRequest;
        }
        return create;
    }

    public DataSet read(IConsensusRequest iConsensusRequest) {
        if (iConsensusRequest instanceof ConfigPhysicalReadPlan) {
            return read((ConfigPhysicalReadPlan) iConsensusRequest);
        }
        LOGGER.error("Unexpected read plan : {}", iConsensusRequest);
        return null;
    }

    protected DataSet read(ConfigPhysicalReadPlan configPhysicalReadPlan) {
        DataSet dataSet;
        try {
            dataSet = this.executor.executeQueryPlan(configPhysicalReadPlan);
        } catch (UnknownPhysicalPlanTypeException | AuthException e) {
            LOGGER.error(e.getMessage());
            dataSet = null;
        }
        return dataSet;
    }

    public boolean takeSnapshot(File file) {
        if (!this.executor.takeSnapshot(file)) {
            return false;
        }
        try {
            PipeConfigNodeAgent.runtime().listener().tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
            return true;
        } catch (IOException e) {
            if (!PipeConfigNodeAgent.runtime().listener().isOpened()) {
                return false;
            }
            LOGGER.warn("Config Region Listening Queue Listen to snapshot failed, the historical data may not be transferred.", e);
            return false;
        }
    }

    public void loadSnapshot(File file) {
        try {
            this.executor.loadSnapshot(file);
            PipeConfigNodeAgent.runtime().listener().tryListenToSnapshots(ConfignodeSnapshotParser.getSnapshots());
        } catch (IOException e) {
            if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
                LOGGER.warn("Config Region Listening Queue Listen to snapshot failed when startup, snapshot will be tried again when starting schema transferring pipes", e);
            }
        }
    }

    public void notifyLeaderChanged(ConsensusGroupId consensusGroupId, int i) {
        int configNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
        if (configNodeId != i) {
            LOGGER.info("Current node [nodeId:{}, ip:port: {}] is no longer the leader, the new leader is [nodeId:{}]", new Object[]{Integer.valueOf(configNodeId), this.currentNodeTEndPoint, Integer.valueOf(i)});
        }
    }

    public void notifyNotLeader() {
        int configNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
        LOGGER.info("Current node [nodeId:{}, ip:port: {}] is no longer the leader, start cleaning up related services", Integer.valueOf(configNodeId), this.currentNodeTEndPoint);
        this.configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
        this.configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
        this.configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync();
        this.configManager.getLoadManager().stopLoadServices();
        this.configManager.getProcedureManager().stopExecutor();
        this.configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
        this.configManager.getPartitionManager().stopRegionCleaner();
        this.configManager.getCQManager().stopCQScheduler();
        this.configManager.getClusterSchemaManager().clearSchemaQuotaCache();
        this.configManager.removeMetrics();
        PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
        PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
        LOGGER.info("Current node [nodeId:{}, ip:port: {}] is no longer the leader, all services on old leader are unavailable now.", Integer.valueOf(configNodeId), this.currentNodeTEndPoint);
    }

    public void notifyLeaderReady() {
        LOGGER.info("Current node [nodeId: {}, ip:port: {}] becomes config region leader", Integer.valueOf(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()), this.currentNodeTEndPoint);
        this.configManager.getLoadManager().startLoadServices();
        this.configManager.getProcedureManager().startExecutor();
        threadPool.submit(() -> {
            this.configManager.getProcedureManager().getStore().getProcedureInfo().upgrade();
        });
        this.configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
        this.configManager.getPartitionManager().startRegionCleaner();
        this.configManager.checkUserPathPrivilege();
        this.configManager.addMetrics();
        PipeConfigNodeAgent.runtime().notifyLeaderReady();
        threadPool.submit(() -> {
            this.configManager.getCQManager().startCQScheduler();
        });
        threadPool.submit(() -> {
            this.configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
        });
        threadPool.submit(() -> {
            this.configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat();
        });
        threadPool.submit(() -> {
            this.configManager.getPipeManager().getPipeRuntimeCoordinator().onConfigRegionGroupLeaderChanged();
        });
        threadPool.submit(() -> {
            this.configManager.getSubscriptionManager().getSubscriptionCoordinator().startSubscriptionMetaSync();
        });
        threadPool.submit(() -> {
            this.configManager.getClusterManager().checkClusterId();
        });
        LOGGER.info("Current node [nodeId: {}, ip:port: {}] as config region leader is ready to work", Integer.valueOf(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()), this.currentNodeTEndPoint);
    }

    public void start() {
        if ("org.apache.iotdb.consensus.simple.SimpleConsensus".equals(CONF.getConfigNodeConsensusProtocolClass())) {
            initStandAloneConfigNode();
        }
    }

    public void stop() {
        PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
    }

    public boolean isReadOnly() {
        return CommonDescriptor.getInstance().getConfig().isReadOnly();
    }

    private void writeLogForSimpleConsensus(ConfigPhysicalPlan configPhysicalPlan) {
        if (this.simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
            try {
                this.simpleLogWriter.force();
                Files.move(this.simpleLogFile.toPath(), new File(FILE_PATH + this.startIndex + "_" + this.endIndex).toPath(), StandardCopyOption.ATOMIC_MOVE);
            } catch (IOException e) {
                LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
            }
            for (int i = 0; i < 5; i++) {
                try {
                    this.simpleLogWriter.close();
                    break;
                } catch (IOException e2) {
                    LOGGER.warn("Can't close StandAloneLog for ConfigNode SimpleConsensus mode, filePath: {}, retry: {}", this.simpleLogFile.getAbsolutePath(), Integer.valueOf(i));
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                        LOGGER.warn("Unexpected interruption during the close method of logWriter");
                    }
                }
            }
            this.startIndex = this.endIndex + 1;
            createLogFile(this.startIndex);
        }
        try {
            ByteBuffer serializeToByteBuffer = configPhysicalPlan.serializeToByteBuffer();
            serializeToByteBuffer.position(serializeToByteBuffer.limit());
            this.simpleLogWriter.write(serializeToByteBuffer);
            this.endIndex++;
        } catch (Exception e4) {
            LOGGER.error("Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e4);
        }
    }

    private void initStandAloneConfigNode() {
        new File(CURRENT_FILE_DIR).mkdirs();
        String[] list = new File(CURRENT_FILE_DIR).list();
        if (list == null || list.length == 0) {
            this.startIndex = 0;
            this.endIndex = 0;
        } else {
            Arrays.sort(list, new FileComparator());
            for (String str : list) {
                File file = SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + str);
                try {
                    SingleFileLogReader singleFileLogReader = new SingleFileLogReader(file);
                    this.startIndex = this.endIndex;
                    while (singleFileLogReader.hasNext()) {
                        this.endIndex++;
                        ConfigPhysicalPlan next = singleFileLogReader.next();
                        try {
                            if (this.executor.executeNonQueryPlan(next).getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                                PipeConfigNodeAgent.runtime().listener().tryListenToPlan(next, false);
                            }
                        } catch (UnknownPhysicalPlanTypeException e) {
                            LOGGER.error(e.getMessage());
                        }
                    }
                    singleFileLogReader.close();
                } catch (FileNotFoundException e2) {
                    LOGGER.error("InitStandAloneConfigNode meets error, can't find standalone log files, filePath: {}", file.getAbsolutePath(), e2);
                }
            }
        }
        this.startIndex++;
        createLogFile(this.endIndex);
        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.CONFIG_NODE_SIMPLE_CONSENSUS_WAL_FLUSH.getName()), this::flushWALForSimpleConsensus, 0L, CONF.getForceWalPeriodForConfigNodeSimpleInMs(), TimeUnit.MILLISECONDS);
    }

    private void flushWALForSimpleConsensus() {
        if (this.simpleLogWriter != null) {
            try {
                this.simpleLogWriter.force();
            } catch (IOException e) {
                LOGGER.error("Can't force logWriter for ConfigNode flushWALForSimpleConsensus", e);
            }
        }
    }

    private void createLogFile(int i) {
        this.simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + i);
        try {
            if (!this.simpleLogFile.createNewFile()) {
                LOGGER.warn("ConfigNode SimpleConsensusFile has existed，filePath:{}", this.simpleLogFile.getAbsolutePath());
            }
            this.simpleLogWriter = new LogWriter(this.simpleLogFile, false);
            LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", this.simpleLogFile.getAbsolutePath());
        } catch (Exception e) {
            LOGGER.warn("Create ConfigNode SimpleConsensusFile failed, filePath: {}", this.simpleLogFile.getAbsolutePath(), e);
        }
    }

    static long parseEndIndex(String str) {
        if (str.startsWith("log_inprogress_")) {
            Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(str);
            if (matcher.find()) {
                return Long.parseLong(matcher.group());
            }
            return 0L;
        }
        Matcher matcher2 = LOG_PATTERN.matcher(str);
        if (matcher2.find()) {
            return Long.parseLong(matcher2.group());
        }
        return 0L;
    }
}
