package com.torodb.mongodb.repl.topology;

import com.google.common.net.HostAndPort;
import com.torodb.common.util.CompletionExceptions;
import com.torodb.core.logging.LoggerFactory;
import com.torodb.core.services.IdleTorodbService;
import com.torodb.mongodb.commands.pojos.ReplicaSetConfig;
import com.torodb.mongodb.commands.signatures.internal.ReplSetHeartbeatCommand;
import com.torodb.mongodb.commands.signatures.internal.ReplSetHeartbeatReply;
import com.torodb.mongodb.repl.guice.ReplSetName;
import com.torodb.mongodb.repl.topology.HeartbeatResponseAction;
import com.torodb.mongowp.ErrorCode;
import com.torodb.mongowp.Status;
import com.torodb.mongowp.client.core.MongoConnection;
import com.torodb.mongowp.client.core.UnreachableMongoServerException;
import com.torodb.mongowp.commands.MongoRuntimeException;
import com.torodb.mongowp.commands.tools.Empty;
import com.torodb.mongowp.exceptions.InconsistentReplicaSetNamesException;
import com.torodb.mongowp.exceptions.MongoException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.logging.log4j.Logger;
import org.jooq.lambda.UncheckedException;

@Singleton
/* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyHeartbeatHandler.class */
public class TopologyHeartbeatHandler extends IdleTorodbService {
    private final Logger logger;
    private final HostAndPort seed;
    private final Clock clock;
    private final String replSetName;
    private final HeartbeatNetworkHandler networkHandler;
    private final TopologyExecutor executor;
    private final TopologyErrorHandler errorHandler;
    private final VersionChangeListener versionChangeListener;

    @GuardedBy("executor")
    private boolean stopped;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.torodb.mongodb.repl.topology.TopologyHeartbeatHandler$1, reason: invalid class name */
    /* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyHeartbeatHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$torodb$mongowp$ErrorCode;

        static {
            try {
                $SwitchMap$com$torodb$mongodb$repl$topology$HeartbeatResponseAction$Action[HeartbeatResponseAction.Action.NO_ACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$torodb$mongodb$repl$topology$HeartbeatResponseAction$Action[HeartbeatResponseAction.Action.RECONFIG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$torodb$mongodb$repl$topology$HeartbeatResponseAction$Action[HeartbeatResponseAction.Action.START_ELECTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$torodb$mongodb$repl$topology$HeartbeatResponseAction$Action[HeartbeatResponseAction.Action.STEP_DOWN_SELF.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$torodb$mongodb$repl$topology$HeartbeatResponseAction$Action[HeartbeatResponseAction.Action.STEP_DOWN_REMOTE_PRIMARY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$com$torodb$mongowp$ErrorCode = new int[ErrorCode.values().length];
            try {
                $SwitchMap$com$torodb$mongowp$ErrorCode[ErrorCode.NO_REPLICATION_ENABLED.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$torodb$mongowp$ErrorCode[ErrorCode.INCONSISTENT_REPLICA_SET_NAMES.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/torodb/mongodb/repl/topology/TopologyHeartbeatHandler$UnsupportedHeartbeatResponseActionException.class */
    public static class UnsupportedHeartbeatResponseActionException extends RuntimeException {
        private static final long serialVersionUID = 8879568483145061898L;
        private final HeartbeatResponseAction action;

        @Nullable
        private final transient ReplSetHeartbeatReply reply;

        public UnsupportedHeartbeatResponseActionException(HeartbeatResponseAction heartbeatResponseAction, ReplSetHeartbeatReply replSetHeartbeatReply) {
            super("Heartbeat action " + heartbeatResponseAction.getAction() + " is not supported");
            this.action = heartbeatResponseAction;
            this.reply = replSetHeartbeatReply;
        }

        public HeartbeatResponseAction getAction() {
            return this.action;
        }

        @Nullable
        public ReplSetHeartbeatReply getReply() {
            return this.reply;
        }
    }

    @Inject
    public TopologyHeartbeatHandler(Clock clock, @ReplSetName String str, LoggerFactory loggerFactory, HeartbeatNetworkHandler heartbeatNetworkHandler, TopologyExecutor topologyExecutor, TopologyErrorHandler topologyErrorHandler, ThreadFactory threadFactory, @RemoteSeed HostAndPort hostAndPort) {
        super(threadFactory);
        this.logger = loggerFactory.apply(getClass());
        this.clock = clock;
        this.replSetName = str;
        this.networkHandler = heartbeatNetworkHandler;
        this.executor = topologyExecutor;
        this.errorHandler = topologyErrorHandler;
        this.versionChangeListener = this::scheduleHeartbeats;
        this.seed = hostAndPort;
    }

    protected final String serviceName() {
        return "Heartbeat handler";
    }

    protected void startUp() throws Exception {
        this.logger.debug("Starting up {}", serviceName());
        boolean z = false;
        while (!z) {
            z = ((Boolean) start(this.seed).handle(this::checkHeartbeatStarted).join()).booleanValue();
            if (!z) {
                this.logger.debug("Retrying to start heartbeats in 1 second");
                Thread.sleep(1000L);
            }
        }
        this.logger.debug("{} has been started up", serviceName());
    }

    protected void shutDown() throws Exception {
        this.logger.debug("Shutting down {}", serviceName());
        this.executor.onAnyVersion().consumeAsync(topologyCoordinator -> {
            this.stopped = true;
        }).join();
        this.logger.debug("{} has been shutted down", serviceName());
    }

    @GuardedBy("any")
    private boolean checkHeartbeatStarted(Status<?> status, Throwable th) {
        if (th != null) {
            Throwable firstNonCompletionException = CompletionExceptions.getFirstNonCompletionException(th);
            if (firstNonCompletionException instanceof UncheckedException) {
                firstNonCompletionException = firstNonCompletionException.getCause() != null ? firstNonCompletionException.getCause() : firstNonCompletionException;
            }
            this.logger.warn("Heartbeat start failed (sync source: " + this.seed + "): " + firstNonCompletionException.getLocalizedMessage(), firstNonCompletionException);
            return false;
        }
        if (status.isOk()) {
            this.logger.trace("Heartbeat started correctly");
            return true;
        }
        this.logger.debug("Heartbeat start failed: {}", status);
        switch (AnonymousClass1.$SwitchMap$com$torodb$mongowp$ErrorCode[status.getErrorCode().ordinal()]) {
            case 1:
                this.logger.warn("The sync source {} is not running with replication enabled", this.seed);
                return false;
            case 2:
            default:
                this.logger.warn(status.getErrorMsg());
                return false;
        }
    }

    CompletableFuture<Status<ReplicaSetConfig>> start(HostAndPort hostAndPort) {
        this.executor.addVersionChangeListener(this.versionChangeListener);
        return this.executor.onCurrentVersion().andThenApplyAsync(this.networkHandler.askForConfig(new RemoteCommandRequest<>(hostAndPort, "admin", Empty.getInstance())), (topologyCoordinator, remoteCommandResponse) -> {
            Status asStatus = remoteCommandResponse.asStatus();
            if (!asStatus.isOk()) {
                return asStatus;
            }
            ReplicaSetConfig replicaSetConfig = (ReplicaSetConfig) asStatus.getResult();
            try {
                checkRemoteReplSetConfig(replicaSetConfig);
                updateConfig(topologyCoordinator, replicaSetConfig);
                return asStatus;
            } catch (InconsistentReplicaSetNamesException e) {
                return Status.from(e);
            }
        });
    }

    private void checkRemoteReplSetConfig(ReplicaSetConfig replicaSetConfig) throws InconsistentReplicaSetNamesException {
        String replSetName = replicaSetConfig.getReplSetName();
        if (!this.replSetName.equals(replSetName)) {
            throw new InconsistentReplicaSetNamesException("The remote replica set configuration is named as '" + replSetName + "', which differs with the local replica set name '" + this.replSetName + "'");
        }
    }

    @GuardedBy("executor")
    private void scheduleHeartbeats(TopologyCoordinator topologyCoordinator, ReplicaSetConfig replicaSetConfig) {
        this.logger.debug("Scheduling new heartbeats to nodes on config {}", Integer.valueOf(topologyCoordinator.getRsConfig().getConfigVersion()));
        topologyCoordinator.getRsConfig().getMembers().stream().forEach(memberConfig -> {
            scheduleHeartbeatToTarget(memberConfig.getHostAndPort(), Duration.ZERO);
        });
    }

    @GuardedBy("any")
    private CompletableFuture<?> scheduleHeartbeatToTarget(HostAndPort hostAndPort, Duration duration) {
        this.logger.trace("Scheduling heartbeat to {} in {}", hostAndPort, duration);
        return this.executor.onCurrentVersion().scheduleOnce(topologyCoordinator -> {
            doHeartbeat(topologyCoordinator, hostAndPort);
        }, duration);
    }

    @GuardedBy("executor")
    private void doHeartbeat(TopologyCoordinator topologyCoordinator, HostAndPort hostAndPort) {
        if (this.stopped) {
            this.logger.trace("Ignoring heartbeat to {} because the handler has been stopped", hostAndPort);
            return;
        }
        Instant instant = this.clock.instant();
        RemoteCommandRequest<ReplSetHeartbeatCommand.ReplSetHeartbeatArgument> prepareHeartbeatRequest = topologyCoordinator.prepareHeartbeatRequest(instant, this.replSetName, hostAndPort);
        this.executor.onCurrentVersion().andThenAcceptAsync(this.networkHandler.sendHeartbeat(prepareHeartbeatRequest).exceptionally(th -> {
            return onNetworkError(th, hostAndPort, instant);
        }), (topologyCoordinator2, remoteCommandResponse) -> {
            handleHeartbeatResponse(topologyCoordinator2, hostAndPort, (ReplSetHeartbeatCommand.ReplSetHeartbeatArgument) prepareHeartbeatRequest.getCmdObj(), remoteCommandResponse);
        });
    }

    @GuardedBy("any")
    private MongoConnection.RemoteCommandResponse<ReplSetHeartbeatReply> onNetworkError(Throwable th, HostAndPort hostAndPort, Instant instant) {
        Throwable th2;
        Throwable firstNonCompletionException = CompletionExceptions.getFirstNonCompletionException(th);
        while (true) {
            th2 = firstNonCompletionException;
            if (th2.getCause() == th2 || !(th2 instanceof UncheckedException)) {
                break;
            }
            firstNonCompletionException = th2.getCause();
        }
        if (th2 instanceof CancellationException) {
            this.logger.trace("Heartbeat handling to {} has been cancelled before execution: {}", hostAndPort, th2.getMessage());
            throw ((CancellationException) th2);
        }
        this.logger.debug("Error while on the heartbeat request sent to " + hostAndPort, th);
        if (this.errorHandler.reciveHeartbeatError(th2)) {
            MongoConnection.RemoteCommandResponse<ReplSetHeartbeatReply> handleHeartbeatError = handleHeartbeatError(th2, instant);
            this.logger.trace("Handled with a response with error {}", handleHeartbeatError.getErrorCode());
            return handleHeartbeatError;
        }
        this.logger.trace("Aborting execution as requested by the topology supervisor");
        stopAsync();
        throw new CancellationException("Aborting execution as requested by the topology supervisor");
    }

    @Nonnull
    private MongoConnection.RemoteCommandResponse<ReplSetHeartbeatReply> handleHeartbeatError(Throwable th, Instant instant) {
        ErrorCode errorCode;
        Duration between = Duration.between(this.clock.instant(), instant);
        if (th instanceof MongoException) {
            return new MongoConnection.FromExceptionRemoteCommandRequest((MongoException) th, between);
        }
        if (th instanceof UnreachableMongoServerException) {
            errorCode = ErrorCode.HOST_UNREACHABLE;
        } else {
            if (!(th instanceof MongoRuntimeException) && !(th instanceof UnreachableMongoServerException)) {
                this.logger.warn("Unexpected exception {} catched by the topology heartbeat handler", th.getClass().getSimpleName());
            }
            errorCode = ErrorCode.UNKNOWN_ERROR;
        }
        return new MongoConnection.ErroneousRemoteCommandResponse(errorCode, th.getLocalizedMessage(), between);
    }

    @GuardedBy("executor")
    private void handleHeartbeatResponse(TopologyCoordinator topologyCoordinator, HostAndPort hostAndPort, ReplSetHeartbeatCommand.ReplSetHeartbeatArgument replSetHeartbeatArgument, MongoConnection.RemoteCommandResponse<ReplSetHeartbeatReply> remoteCommandResponse) {
        boolean z = remoteCommandResponse.getErrorCode() == ErrorCode.UNAUTHORIZED || remoteCommandResponse.getErrorCode() == ErrorCode.AUTHENTICATION_FAILED;
        Instant instant = this.clock.instant();
        Duration duration = Duration.ZERO;
        if (remoteCommandResponse.isOk()) {
            duration = remoteCommandResponse.getNetworkTime();
        } else {
            this.logger.warn("Error in heartbeat request to {}; {}", hostAndPort, remoteCommandResponse.asStatus());
            if (remoteCommandResponse.getBson() != null) {
                this.logger.debug("heartbeat response: ", remoteCommandResponse.getBson());
            }
            if (z) {
                duration = remoteCommandResponse.getNetworkTime();
            }
        }
        HeartbeatResponseAction processHeartbeatResponse = topologyCoordinator.processHeartbeatResponse(instant, duration, hostAndPort, remoteCommandResponse);
        ReplSetHeartbeatReply replSetHeartbeatReply = (ReplSetHeartbeatReply) remoteCommandResponse.getCommandReply().orElse(null);
        if (!$assertionsDisabled && replSetHeartbeatReply == null && remoteCommandResponse.isOk()) {
            throw new AssertionError("Recived a null hbReply when the request didn't fail");
        }
        scheduleHeartbeatToTarget(hostAndPort, processHeartbeatResponse.getNextHeartbeatDelay());
        handleHeartbeatResponseAction(topologyCoordinator, processHeartbeatResponse, replSetHeartbeatReply, remoteCommandResponse.getErrorCode());
    }

    @GuardedBy("executor")
    private void handleHeartbeatResponseAction(TopologyCoordinator topologyCoordinator, HeartbeatResponseAction heartbeatResponseAction, @Nullable ReplSetHeartbeatReply replSetHeartbeatReply, ErrorCode errorCode) throws UnsupportedHeartbeatResponseActionException {
        switch (heartbeatResponseAction.getAction()) {
            case NO_ACTION:
                return;
            case RECONFIG:
                if (!$assertionsDisabled && replSetHeartbeatReply == null) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && !replSetHeartbeatReply.getConfig().isPresent()) {
                    throw new AssertionError();
                }
                updateConfig(topologyCoordinator, (ReplicaSetConfig) replSetHeartbeatReply.getConfig().get());
                return;
            case START_ELECTION:
            case STEP_DOWN_SELF:
            case STEP_DOWN_REMOTE_PRIMARY:
                throw new UnsupportedHeartbeatResponseActionException(heartbeatResponseAction, replSetHeartbeatReply);
            default:
                this.logger.error("Illegal heartbeat response action code {}", heartbeatResponseAction.getAction());
                throw new AssertionError();
        }
    }

    @GuardedBy("executor")
    private void updateConfig(TopologyCoordinator topologyCoordinator, ReplicaSetConfig replicaSetConfig) {
        validateConfig(topologyCoordinator, replicaSetConfig);
        topologyCoordinator.updateConfig(replicaSetConfig, this.clock.instant());
    }

    @GuardedBy("executor")
    private void validateConfig(TopologyCoordinator topologyCoordinator, ReplicaSetConfig replicaSetConfig) {
        this.logger.debug("Accepting the new replica set config (version is {}) without validating it first (not supported yet)", Integer.valueOf(replicaSetConfig.getConfigVersion()));
    }

    static {
        $assertionsDisabled = !TopologyHeartbeatHandler.class.desiredAssertionStatus();
    }
}
