package org.apache.hadoop.yarn.server.federation.store.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.data.ACL;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.class */
public class ZookeeperFederationStateStore implements FederationStateStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
    private static final String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
    private static final String ROOT_ZNODE_NAME_APPLICATION = "applications";
    private static final String ROOT_ZNODE_NAME_POLICY = "policies";
    private ZKCuratorManager zkManager;
    private String baseZNode;
    private String appsZNode;
    private String membershipZNode;
    private String policiesZNode;

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationStateStore
    public void init(Configuration configuration) throws YarnException {
        LOG.info("Initializing ZooKeeper connection");
        this.baseZNode = configuration.get(YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
        try {
            this.zkManager = new ZKCuratorManager(configuration);
            this.zkManager.start();
        } catch (IOException e) {
            LOG.error("Cannot initialize the ZK connection", e);
        }
        this.membershipZNode = ZKCuratorManager.getNodePath(this.baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
        this.appsZNode = ZKCuratorManager.getNodePath(this.baseZNode, ROOT_ZNODE_NAME_APPLICATION);
        this.policiesZNode = ZKCuratorManager.getNodePath(this.baseZNode, ROOT_ZNODE_NAME_POLICY);
        try {
            List<ACL> zKAcls = ZKCuratorManager.getZKAcls(configuration);
            this.zkManager.createRootDirRecursively(this.membershipZNode, zKAcls);
            this.zkManager.createRootDirRecursively(this.appsZNode, zKAcls);
            this.zkManager.createRootDirRecursively(this.policiesZNode, zKAcls);
        } catch (Exception e2) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot create base directories: " + e2.getMessage());
        }
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationStateStore
    public void close() throws Exception {
        if (this.zkManager != null) {
            this.zkManager.close();
        }
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationApplicationHomeSubClusterStore
    public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest addApplicationHomeSubClusterRequest) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(addApplicationHomeSubClusterRequest);
        ApplicationHomeSubCluster applicationHomeSubCluster = addApplicationHomeSubClusterRequest.getApplicationHomeSubCluster();
        ApplicationId applicationId = applicationHomeSubCluster.getApplicationId();
        SubClusterId homeSubCluster = applicationHomeSubCluster.getHomeSubCluster();
        try {
            putApp(applicationId, homeSubCluster, false);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot add application home subcluster for " + applicationId);
        }
        try {
            homeSubCluster = getApp(applicationId);
        } catch (Exception e2) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot check app home subcluster for " + applicationId);
        }
        return AddApplicationHomeSubClusterResponse.newInstance(homeSubCluster);
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationApplicationHomeSubClusterStore
    public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(UpdateApplicationHomeSubClusterRequest updateApplicationHomeSubClusterRequest) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(updateApplicationHomeSubClusterRequest);
        ApplicationId applicationId = updateApplicationHomeSubClusterRequest.getApplicationHomeSubCluster().getApplicationId();
        if (getApp(applicationId) == null) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Application " + applicationId + " does not exist");
        }
        putApp(applicationId, updateApplicationHomeSubClusterRequest.getApplicationHomeSubCluster().getHomeSubCluster(), true);
        return UpdateApplicationHomeSubClusterResponse.newInstance();
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationApplicationHomeSubClusterStore
    public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(GetApplicationHomeSubClusterRequest getApplicationHomeSubClusterRequest) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(getApplicationHomeSubClusterRequest);
        ApplicationId applicationId = getApplicationHomeSubClusterRequest.getApplicationId();
        SubClusterId app = getApp(applicationId);
        if (app == null) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Application " + applicationId + " does not exist");
        }
        return GetApplicationHomeSubClusterResponse.newInstance(ApplicationHomeSubCluster.newInstance(applicationId, app));
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationApplicationHomeSubClusterStore
    public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(GetApplicationsHomeSubClusterRequest getApplicationsHomeSubClusterRequest) throws YarnException {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<String> it = this.zkManager.getChildren(this.appsZNode).iterator();
            while (it.hasNext()) {
                ApplicationId fromString = ApplicationId.fromString(it.next());
                arrayList.add(ApplicationHomeSubCluster.newInstance(fromString, getApp(fromString)));
            }
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot get apps: " + e.getMessage());
        }
        return GetApplicationsHomeSubClusterResponse.newInstance(arrayList);
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationApplicationHomeSubClusterStore
    public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(DeleteApplicationHomeSubClusterRequest deleteApplicationHomeSubClusterRequest) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(deleteApplicationHomeSubClusterRequest);
        ApplicationId applicationId = deleteApplicationHomeSubClusterRequest.getApplicationId();
        String nodePath = ZKCuratorManager.getNodePath(this.appsZNode, applicationId.toString());
        boolean z = false;
        try {
            z = this.zkManager.exists(nodePath);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot check app: " + e.getMessage());
        }
        if (!z) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Application " + applicationId + " does not exist");
        }
        try {
            this.zkManager.delete(nodePath);
        } catch (Exception e2) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot delete app: " + e2.getMessage());
        }
        return DeleteApplicationHomeSubClusterResponse.newInstance();
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore
    public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest subClusterRegisterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterRegisterRequest);
        SubClusterInfo subClusterInfo = subClusterRegisterRequest.getSubClusterInfo();
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        subClusterInfo.setLastHeartBeat(getCurrentTime());
        try {
            putSubclusterInfo(subClusterId, subClusterInfo, true);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot register subcluster: " + e.getMessage());
        }
        return SubClusterRegisterResponse.newInstance();
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore
    public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest subClusterDeregisterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterDeregisterRequest);
        SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
        SubClusterState state = subClusterDeregisterRequest.getState();
        SubClusterInfo subclusterInfo = getSubclusterInfo(subClusterId);
        if (subclusterInfo == null) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "SubCluster " + subClusterId + " not found");
        } else {
            subclusterInfo.setState(state);
            putSubclusterInfo(subClusterId, subclusterInfo, true);
        }
        return SubClusterDeregisterResponse.newInstance();
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore
    public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest subClusterHeartbeatRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterHeartbeatRequest);
        SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
        SubClusterInfo subclusterInfo = getSubclusterInfo(subClusterId);
        if (subclusterInfo == null) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "SubCluster " + subClusterId + " does not exist; cannot heartbeat");
        }
        subclusterInfo.setLastHeartBeat(getCurrentTime());
        subclusterInfo.setState(subClusterHeartbeatRequest.getState());
        subclusterInfo.setCapability(subClusterHeartbeatRequest.getCapability());
        putSubclusterInfo(subClusterId, subclusterInfo, true);
        return SubClusterHeartbeatResponse.newInstance();
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore
    public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest getSubClusterInfoRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(getSubClusterInfoRequest);
        SubClusterId subClusterId = getSubClusterInfoRequest.getSubClusterId();
        SubClusterInfo subClusterInfo = null;
        try {
            subClusterInfo = getSubclusterInfo(subClusterId);
            if (subClusterInfo == null) {
                LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
                return null;
            }
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot get subcluster: " + e.getMessage());
        }
        return GetSubClusterInfoResponse.newInstance(subClusterInfo);
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore
    public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest getSubClustersInfoRequest) throws YarnException {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<String> it = this.zkManager.getChildren(this.membershipZNode).iterator();
            while (it.hasNext()) {
                SubClusterInfo subclusterInfo = getSubclusterInfo(SubClusterId.newInstance(it.next()));
                if (!getSubClustersInfoRequest.getFilterInactiveSubClusters() || subclusterInfo.getState().isActive()) {
                    arrayList.add(subclusterInfo);
                }
            }
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot get subclusters: " + e.getMessage());
        }
        return GetSubClustersInfoResponse.newInstance(arrayList);
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationPolicyStore
    public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(GetSubClusterPolicyConfigurationRequest getSubClusterPolicyConfigurationRequest) throws YarnException {
        FederationPolicyStoreInputValidator.validate(getSubClusterPolicyConfigurationRequest);
        String queue = getSubClusterPolicyConfigurationRequest.getQueue();
        SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
        try {
            subClusterPolicyConfiguration = getPolicy(queue);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot get policy: " + e.getMessage());
        }
        if (subClusterPolicyConfiguration != null) {
            return GetSubClusterPolicyConfigurationResponse.newInstance(subClusterPolicyConfiguration);
        }
        LOG.warn("Policy for queue: {} does not exist.", queue);
        return null;
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationPolicyStore
    public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest setSubClusterPolicyConfigurationRequest) throws YarnException {
        FederationPolicyStoreInputValidator.validate(setSubClusterPolicyConfigurationRequest);
        SubClusterPolicyConfiguration policyConfiguration = setSubClusterPolicyConfigurationRequest.getPolicyConfiguration();
        try {
            putPolicy(policyConfiguration.getQueue(), policyConfiguration, true);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot set policy: " + e.getMessage());
        }
        return SetSubClusterPolicyConfigurationResponse.newInstance();
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationPolicyStore
    public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(GetSubClusterPoliciesConfigurationsRequest getSubClusterPoliciesConfigurationsRequest) throws YarnException {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<String> it = this.zkManager.getChildren(this.policiesZNode).iterator();
            while (it.hasNext()) {
                arrayList.add(getPolicy(it.next()));
            }
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot get policies: " + e.getMessage());
        }
        return GetSubClusterPoliciesConfigurationsResponse.newInstance(arrayList);
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationStateStore
    public Version getCurrentVersion() {
        return null;
    }

    @Override // org.apache.hadoop.yarn.server.federation.store.FederationStateStore
    public Version loadVersion() {
        return null;
    }

    private SubClusterId getApp(ApplicationId applicationId) throws YarnException {
        String nodePath = ZKCuratorManager.getNodePath(this.appsZNode, applicationId.toString());
        SubClusterIdPBImpl subClusterIdPBImpl = null;
        byte[] bArr = get(nodePath);
        if (bArr != null) {
            try {
                subClusterIdPBImpl = new SubClusterIdPBImpl(YarnServerFederationProtos.SubClusterIdProto.parseFrom(bArr));
            } catch (InvalidProtocolBufferException e) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot parse application at " + nodePath);
            }
        }
        return subClusterIdPBImpl;
    }

    private void putApp(ApplicationId applicationId, SubClusterId subClusterId, boolean z) throws YarnException {
        put(ZKCuratorManager.getNodePath(this.appsZNode, applicationId.toString()), ((SubClusterIdPBImpl) subClusterId).getProto().toByteArray(), z);
    }

    private SubClusterInfo getSubclusterInfo(SubClusterId subClusterId) throws YarnException {
        String nodePath = ZKCuratorManager.getNodePath(this.membershipZNode, subClusterId.toString());
        SubClusterInfoPBImpl subClusterInfoPBImpl = null;
        byte[] bArr = get(nodePath);
        if (bArr != null) {
            try {
                subClusterInfoPBImpl = new SubClusterInfoPBImpl(YarnServerFederationProtos.SubClusterInfoProto.parseFrom(bArr));
            } catch (InvalidProtocolBufferException e) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot parse subcluster info at " + nodePath);
            }
        }
        return subClusterInfoPBImpl;
    }

    private void putSubclusterInfo(SubClusterId subClusterId, SubClusterInfo subClusterInfo, boolean z) throws YarnException {
        put(ZKCuratorManager.getNodePath(this.membershipZNode, subClusterId.toString()), ((SubClusterInfoPBImpl) subClusterInfo).getProto().toByteArray(), z);
    }

    private SubClusterPolicyConfiguration getPolicy(String str) throws YarnException {
        String nodePath = ZKCuratorManager.getNodePath(this.policiesZNode, str);
        SubClusterPolicyConfigurationPBImpl subClusterPolicyConfigurationPBImpl = null;
        byte[] bArr = get(nodePath);
        if (bArr != null) {
            try {
                subClusterPolicyConfigurationPBImpl = new SubClusterPolicyConfigurationPBImpl(YarnServerFederationProtos.SubClusterPolicyConfigurationProto.parseFrom(bArr));
            } catch (InvalidProtocolBufferException e) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot parse policy at " + nodePath);
            }
        }
        return subClusterPolicyConfigurationPBImpl;
    }

    private void putPolicy(String str, SubClusterPolicyConfiguration subClusterPolicyConfiguration, boolean z) throws YarnException {
        put(ZKCuratorManager.getNodePath(this.policiesZNode, str), ((SubClusterPolicyConfigurationPBImpl) subClusterPolicyConfiguration).getProto().toByteArray(), z);
    }

    private byte[] get(String str) throws YarnException {
        boolean z = false;
        try {
            z = this.zkManager.exists(str);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot find znode " + str);
        }
        if (!z) {
            LOG.error("{} does not exist", str);
            return null;
        }
        byte[] bArr = null;
        try {
            bArr = this.zkManager.getData(str);
        } catch (Exception e2) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot get data from znode " + str + ": " + e2.getMessage());
        }
        return bArr;
    }

    private void put(String str, byte[] bArr, boolean z) throws YarnException {
        boolean z2 = false;
        try {
            z2 = this.zkManager.create(str);
        } catch (Exception e) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot create znode " + str + ": " + e.getMessage());
        }
        if (!z2) {
            LOG.debug("{} not created", str);
            if (!z) {
                LOG.info("{} already existed and we are not updating", str);
                return;
            }
        }
        try {
            this.zkManager.setData(str, bArr, -1);
        } catch (Exception e2) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Cannot write data into znode " + str + ": " + e2.getMessage());
        }
    }

    private static long getCurrentTime() {
        return Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
    }
}
