package io.lighty.core.cluster.kubernetes;

import akka.actor.AbstractActor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ContainerStatus;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.ClientBuilder;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.ReadTransaction;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/lighty/core/cluster/kubernetes/UnreachableListener.class */
public class UnreachableListener extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(UnreachableListener.class);
    private static final long DEFAULT_UNREACHABLE_RESTART_TIMEOUT = 30;
    private final Cluster cluster = Cluster.get(getContext().getSystem());
    private final ActorSystem actorSystem;
    private final DataBroker dataBroker;
    private final ClusterAdminService clusterAdminRPCService;
    private final Long podRestartTimeout;
    private final Set<Member> initialUnreachableSet;
    private CoreV1Api kubernetesApi;
    private String kubernetesPodNamespace;
    private String kubernetesPodSelector;

    public UnreachableListener(ActorSystem actorSystem, DataBroker dataBroker, ClusterAdminService clusterAdminService, String str, String str2, Long l) {
        LOG.info("UnreachableListener created");
        this.dataBroker = dataBroker;
        this.clusterAdminRPCService = clusterAdminService;
        this.actorSystem = actorSystem;
        this.kubernetesPodNamespace = str;
        this.kubernetesPodSelector = str2;
        this.initialUnreachableSet = new HashSet();
        if (l == null || l.longValue() == 0) {
            this.podRestartTimeout = Long.valueOf(DEFAULT_UNREACHABLE_RESTART_TIMEOUT);
            LOG.info("Pod-restart-timeout wasn't loaded from akka-config, using default:{}", this.podRestartTimeout);
        } else {
            this.podRestartTimeout = l;
            LOG.info("Pod-restart-timeout value was loaded from akka-config:{}", this.podRestartTimeout);
        }
        try {
            Configuration.setDefaultApiClient(ClientBuilder.cluster().build());
            this.kubernetesApi = new CoreV1Api();
        } catch (IOException e) {
            LOG.error("IOException while initializing cluster ApiClient", e);
        }
    }

    public static Props props(ActorSystem actorSystem, DataBroker dataBroker, ClusterAdminService clusterAdminService, String str, String str2, Long l) {
        return Props.create(UnreachableListener.class, () -> {
            return new UnreachableListener(actorSystem, dataBroker, clusterAdminService, str, str2, l);
        });
    }

    public void preStart() {
        this.cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), new Class[]{ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class});
        this.initialUnreachableSet.addAll(this.cluster.state().getUnreachable());
        if (this.initialUnreachableSet.isEmpty()) {
            return;
        }
        for (Member member : this.initialUnreachableSet) {
            LOG.info("PreStart: Member detected as unreachable, preparing for downing: {}", member.address());
            processUnreachableMember(member);
        }
    }

    public void postStop() {
        this.cluster.unsubscribe(getSelf());
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ClusterEvent.UnreachableMember.class, unreachableMember -> {
            if (this.initialUnreachableSet.contains(unreachableMember.member())) {
                this.initialUnreachableSet.remove(unreachableMember.member());
                LOG.info("Member {} was already removed during PreStart.", unreachableMember.member().address());
            } else {
                LOG.info("Member detected as unreachable, processing: {}", unreachableMember.member().address());
                processUnreachableMember(unreachableMember.member());
            }
        }).match(ClusterEvent.MemberRemoved.class, memberRemoved -> {
            LOG.info("Member was Removed: {}", memberRemoved.member());
        }).build();
    }

    private void processUnreachableMember(Member member) {
        ClusterEvent.CurrentClusterState state = this.cluster.state();
        if (!isMajorityReachable(((Collection) state.getMembers()).size(), state.getUnreachable().size())) {
            LOG.warn("Majority of cluster seems to be unreachable. This is probably due to network partition in which case the other side will resolve it (since they are the majority). Downing members from this side isn't safe");
        } else if (!safeToDownMember(member)) {
            LOG.info("It is not safe to down member {}", member.address());
        } else {
            downMember(member);
            LOG.info("Downing complete");
        }
    }

    private boolean isMajorityReachable(int i, int i2) {
        return ((double) (i - i2)) >= Math.floor(((double) (i + 1)) / 2.0d) + ((double) ((i + 1) % 2));
    }

    private void downMember(Member member) {
        LOG.info("Downing member {}", member.address());
        List<String> list = (List) member.getRoles().stream().filter(str -> {
            return !str.contains(KubernetesClusteringHandlerImpl.K8S_DEFAULT_POD_NAMESPACE);
        }).collect(Collectors.toList());
        this.cluster.down(member.address());
        WriteTransaction newWriteOnlyTransaction = this.dataBroker.newWriteOnlyTransaction();
        for (InstanceIdentifier<Candidate> instanceIdentifier : getCandidatesFromDatastore(member)) {
            LOG.debug("Deleting candidate: {}", instanceIdentifier);
            newWriteOnlyTransaction.delete(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
        }
        try {
            for (String str2 : list) {
                RpcResult rpcResult = (RpcResult) this.clusterAdminRPCService.removeAllShardReplicas(new RemoveAllShardReplicasInputBuilder().setMemberName(str2).build()).get();
                if (rpcResult.isSuccessful()) {
                    LOG.debug("RPC RemoveAllShards for member {} executed successfully", str2);
                } else {
                    LOG.warn("RPC RemoveAllShards for member {} failed: {}", str2, rpcResult.getErrors());
                }
            }
            newWriteOnlyTransaction.commit().get();
            LOG.debug("Delete-Candidates transaction was successful");
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Delete-Candidates transaction failed", e);
        }
    }

    private List<InstanceIdentifier<Candidate>> getCandidatesFromDatastore(Member member) {
        List list = (List) member.getRoles().stream().filter(str -> {
            return !str.contains(KubernetesClusteringHandlerImpl.K8S_DEFAULT_POD_NAMESPACE);
        }).collect(Collectors.toList());
        LOG.debug("Getting Candidates from model EntityOwners for member's roles: {}", list);
        LinkedList linkedList = new LinkedList();
        try {
            ReadTransaction newReadOnlyTransaction = this.dataBroker.newReadOnlyTransaction();
            try {
                EntityOwners entityOwners = (EntityOwners) ((Optional) newReadOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(EntityOwners.class)).get()).orElse(null);
                if (newReadOnlyTransaction != null) {
                    newReadOnlyTransaction.close();
                }
                for (EntityType entityType : entityOwners.getEntityType()) {
                    for (Entity entity : entityType.getEntity()) {
                        for (Candidate candidate : entity.getCandidate()) {
                            if (list.contains(candidate.getName())) {
                                LOG.debug("Found candidate in shard: {}", entity.getId());
                                linkedList.add(InstanceIdentifier.builder(EntityOwners.class).child(EntityType.class, entityType.key()).child(Entity.class, entity.key()).child(Candidate.class, candidate.key()).build());
                            }
                        }
                    }
                }
                LOG.debug("The removed member is registered as candidate in {}", Integer.valueOf(linkedList.size()));
                LOG.trace("The removed member is registered as: {}", linkedList);
                return linkedList;
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Couldn't read data from model EntityOwners", e);
            return Collections.emptyList();
        }
    }

    private ListenableScheduledFuture schedulePodRestart(Member member, String str) {
        if (str == null || str.isEmpty()) {
            LOG.error("Pod name was missing or empty. Can't schedule Pod restart.");
            return null;
        }
        LOG.info("Before restarting wait {}s. If member becomes reachable again, restart will be aborted", this.podRestartTimeout);
        return MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()).schedule(() -> {
            ClusterEvent.CurrentClusterState state = Cluster.get(this.actorSystem).state();
            if (!((Collection) state.getMembers()).contains(member)) {
                LOG.warn("Member {} is no longer listed among other cluster members. Trying to restart it.", str);
                sendRestartRequest(member, str);
            } else if (!state.getUnreachable().contains(member)) {
                LOG.debug("Member {} is reachable again. Aborting POD restart", str);
            } else {
                LOG.debug("Requesting Kubernetes to restart the pod {}", str);
                sendRestartRequest(member, str);
            }
        }, this.podRestartTimeout.longValue(), TimeUnit.SECONDS);
    }

    private void sendRestartRequest(Member member, String str) {
        LOG.info("Member didn't return to reachable state, trying to restart its Pod");
        try {
            ApiResponse deleteNamespacedPodWithHttpInfo = this.kubernetesApi.deleteNamespacedPodWithHttpInfo(str, this.kubernetesPodNamespace, (String) null, (String) null, (Integer) null, (Boolean) null, (String) null, (V1DeleteOptions) null);
            int statusCode = deleteNamespacedPodWithHttpInfo.getStatusCode();
            if (statusCode >= 200 && statusCode < 300) {
                LOG.info("Request successful. Kubernetes will restart Pod with name: {}", str);
                downMember(member);
            } else if (statusCode == 404) {
                LOG.info("Request to delete Pod {} failed because the pod no longer exists. Safe to down member.", str);
                downMember(member);
            } else {
                LOG.error("Request to delete Pod {} failed. Not safe to down member. Response from Kubernetes: {}", str, deleteNamespacedPodWithHttpInfo);
            }
        } catch (ApiException e) {
            LOG.debug("ApiException on api.deleteNamespacedPodWithHttpInfo", e);
            if (e.getCode() != 404) {
                LOG.error("Unhandled response from API on api.deleteNamedSpacedPod with response code {} . Not safe to down member. ", Integer.valueOf(e.getCode()));
            } else {
                LOG.info("Request to delete Pod {} failed because the pod no longer exists. Safe to down member.", str);
                downMember(member);
            }
        }
    }

    public boolean safeToDownMember(Member member) {
        Optional<V1PodList> allLightyPods = getAllLightyPods();
        if (allLightyPods.isEmpty()) {
            LOG.error("List of Pods wasn't received. Can't decide whether it's safe to Down the unreachable member {}", member.address());
            return false;
        }
        boolean z = false;
        V1Pod v1Pod = null;
        String str = (String) member.address().host().get();
        LOG.debug("Address of unreachable member is: {}", str);
        Iterator it = allLightyPods.get().getItems().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            V1Pod v1Pod2 = (V1Pod) it.next();
            LOG.debug("Pod: {} has PodIP: {}", v1Pod2.getMetadata().getName(), v1Pod2.getStatus().getPodIP());
            if (v1Pod2.getStatus().getPodIP().equals(str)) {
                z = true;
                v1Pod = v1Pod2;
                break;
            }
        }
        if (z) {
            LOG.debug("IP of unreachable was found in Pods List. Checking container state");
            return analyzePodState(member, v1Pod);
        }
        LOG.debug("IP of unreachable was not found in Pods List.. it is safe to delete it");
        return true;
    }

    private Optional<V1PodList> getAllLightyPods() {
        ApiResponse listNamespacedPodWithHttpInfo;
        int statusCode;
        LOG.debug("Getting Lighty Pods from Kubernetes");
        try {
            listNamespacedPodWithHttpInfo = this.kubernetesApi.listNamespacedPodWithHttpInfo(this.kubernetesPodNamespace, (String) null, (Boolean) null, (String) null, (String) null, this.kubernetesPodSelector, (Integer) null, (String) null, (Integer) null, (Boolean) null);
            statusCode = listNamespacedPodWithHttpInfo.getStatusCode();
        } catch (ApiException e) {
            LOG.debug("ApiException on api.listNamespacedPodWithHttpInfo", e);
            LOG.warn("Error retrieving Pods List , Http status code = {}", Integer.valueOf(e.getCode()));
        }
        if (statusCode < 200 || statusCode >= 300) {
            LOG.warn("Error retrieving Pods List , Http status code = {}", Integer.valueOf(statusCode));
            return Optional.empty();
        }
        LOG.info("Successfully retrieved Pods List");
        return Optional.of((V1PodList) listNamespacedPodWithHttpInfo.getData());
    }

    private boolean analyzePodState(Member member, V1Pod v1Pod) {
        List containerStatuses = v1Pod.getStatus().getContainerStatuses();
        if (containerStatuses == null || containerStatuses.isEmpty()) {
            LOG.warn("ContainerStatuses list missing or empty");
            LOG.debug("ContainerStatuses detail: {}", v1Pod.getStatus().getContainerStatuses());
        } else if (((V1ContainerStatus) containerStatuses.get(0)).getReady().booleanValue()) {
            LOG.debug("ContainerStatus is READY");
        } else {
            LOG.debug("State of the container is - {} ", ((V1ContainerStatus) containerStatuses.get(0)).getState().toString());
            if (((V1ContainerStatus) containerStatuses.get(0)).getState().getTerminated() != null) {
                LOG.debug("Found state container - Terminated, safe to Down member");
                return true;
            }
            LOG.debug("State of the container is not terminated");
        }
        schedulePodRestart(member, v1Pod.getMetadata().getName());
        return false;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -996494963:
                if (implMethodName.equals("lambda$props$c1d158e6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/lighty/core/cluster/kubernetes/UnreachableListener") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorSystem;Lorg/opendaylight/mdsal/binding/api/DataBroker;Lorg/opendaylight/yang/gen/v1/urn/opendaylight/params/xml/ns/yang/controller/md/sal/cluster/admin/rev151013/ClusterAdminService;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;)Lio/lighty/core/cluster/kubernetes/UnreachableListener;")) {
                    ActorSystem actorSystem = (ActorSystem) serializedLambda.getCapturedArg(0);
                    DataBroker dataBroker = (DataBroker) serializedLambda.getCapturedArg(1);
                    ClusterAdminService clusterAdminService = (ClusterAdminService) serializedLambda.getCapturedArg(2);
                    String str = (String) serializedLambda.getCapturedArg(3);
                    String str2 = (String) serializedLambda.getCapturedArg(4);
                    Long l = (Long) serializedLambda.getCapturedArg(5);
                    return () -> {
                        return new UnreachableListener(actorSystem, dataBroker, clusterAdminService, str, str2, l);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
