package io.scalecube.cluster.fdetector;

import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.transport.Message;
import io.scalecube.transport.Transport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/scalecube/cluster/fdetector/FailureDetectorImpl.class */
public final class FailureDetectorImpl implements FailureDetector {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);
    public static final String PING = "sc/fdetector/ping";
    public static final String PING_REQ = "sc/fdetector/pingReq";
    public static final String PING_ACK = "sc/fdetector/pingAck";
    private final Transport transport;
    private final Supplier<Member> memberSupplier;
    private final FailureDetectorConfig config;
    private long period = 0;
    private List<Member> pingMembers = new ArrayList();
    private int pingMemberIndex = 0;
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject = DirectProcessor.create().serialize();
    private final FluxSink<FailureDetectorEvent> sink = this.subject.sink();
    private final Scheduler scheduler;

    public FailureDetectorImpl(Supplier<Member> supplier, Transport transport, Flux<MembershipEvent> flux, FailureDetectorConfig failureDetectorConfig, Scheduler scheduler) {
        this.memberSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.transport = (Transport) Objects.requireNonNull(transport);
        this.config = (FailureDetectorConfig) Objects.requireNonNull(failureDetectorConfig);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.actionsDisposables.addAll(Arrays.asList(flux.publishOn(scheduler).subscribe(this::onMemberEvent, this::onError), transport.listen().publishOn(scheduler).subscribe(this::onMessage, this::onError)));
    }

    @Override // io.scalecube.cluster.fdetector.FailureDetector
    public void start() {
        this.actionsDisposables.add(this.scheduler.schedulePeriodically(this::doPing, this.config.getPingInterval(), this.config.getPingInterval(), TimeUnit.MILLISECONDS));
    }

    @Override // io.scalecube.cluster.fdetector.FailureDetector
    public void stop() {
        this.actionsDisposables.dispose();
        this.sink.complete();
    }

    @Override // io.scalecube.cluster.fdetector.FailureDetector
    public Flux<FailureDetectorEvent> listen() {
        return this.subject.onBackpressureBuffer();
    }

    private void doPing() {
        this.period++;
        Member selectPingMember = selectPingMember();
        if (selectPingMember == null) {
            return;
        }
        Member member = this.memberSupplier.get();
        String str = member.id() + "-" + Long.toString(this.period);
        Message build = Message.withData(new PingData(member, selectPingMember)).qualifier(PING).correlationId(str).build();
        this.transport.listen().filter(this::isPingAck).filter(message -> {
            return str.equals(message.correlationId());
        }).take(1L).timeout(Duration.ofMillis(this.config.getPingTimeout()), this.scheduler).publishOn(this.scheduler).subscribe(message2 -> {
            LOGGER.trace("Received PingAck[{}] from {}", Long.valueOf(this.period), selectPingMember);
            publishPingResult(selectPingMember, MemberStatus.ALIVE);
        }, th -> {
            LOGGER.trace("Timeout getting PingAck[{}] from {} within {} ms", new Object[]{Long.valueOf(this.period), selectPingMember, Integer.valueOf(this.config.getPingTimeout())});
            doPingReq(selectPingMember, str);
        });
        LOGGER.trace("Send Ping[{}] to {}", Long.valueOf(this.period), selectPingMember);
        this.transport.send(selectPingMember.address(), build).subscribe();
    }

    private void doPingReq(Member member, String str) {
        int pingInterval = this.config.getPingInterval() - this.config.getPingTimeout();
        if (pingInterval <= 0) {
            LOGGER.trace("No PingReq[{}] occurred, because no time left (pingInterval={}, pingTimeout={})", new Object[]{Long.valueOf(this.period), Integer.valueOf(this.config.getPingInterval()), Integer.valueOf(this.config.getPingTimeout())});
            publishPingResult(member, MemberStatus.SUSPECT);
            return;
        }
        List<Member> selectPingReqMembers = selectPingReqMembers(member);
        if (selectPingReqMembers.isEmpty()) {
            LOGGER.trace("No PingReq[{}] occurred, because member selection is empty", Long.valueOf(this.period));
            publishPingResult(member, MemberStatus.SUSPECT);
            return;
        }
        Member member2 = this.memberSupplier.get();
        this.transport.listen().filter(this::isPingAck).filter(message -> {
            return str.equals(message.correlationId());
        }).take(1L).timeout(Duration.ofMillis(pingInterval), this.scheduler).publishOn(this.scheduler).subscribe(message2 -> {
            LOGGER.trace("Received transit PingAck[{}] from {} to {}", new Object[]{Long.valueOf(this.period), message2.sender(), member});
            publishPingResult(member, MemberStatus.ALIVE);
        }, th -> {
            LOGGER.trace("Timeout getting transit PingAck[{}] from {} to {} within {} ms", new Object[]{Long.valueOf(this.period), selectPingReqMembers, member, Integer.valueOf(pingInterval)});
            publishPingResult(member, MemberStatus.SUSPECT);
        });
        Message build = Message.withData(new PingData(member2, member)).qualifier(PING_REQ).correlationId(str).build();
        LOGGER.trace("Send PingReq[{}] to {} for {}", new Object[]{Long.valueOf(this.period), selectPingReqMembers, member});
        Flux.fromIterable(selectPingReqMembers).flatMap(member3 -> {
            return this.transport.send(member3.address(), build);
        }).subscribe();
    }

    private void onMessage(Message message) {
        if (isPing(message)) {
            onPing(message);
        } else if (isPingReq(message)) {
            onPingReq(message);
        } else if (isTransitPingAck(message)) {
            onTransitPingAck(message);
        }
    }

    private void onPing(Message message) {
        LOGGER.trace("Received Ping: {}", message);
        PingData pingData = (PingData) message.data();
        Member member = this.memberSupplier.get();
        if (!pingData.getTo().id().equals(member.id())) {
            LOGGER.warn("Received Ping to {}, but local member is {}", pingData.getTo(), member);
            return;
        }
        Message build = Message.withData(pingData).qualifier(PING_ACK).correlationId(message.correlationId()).build();
        LOGGER.trace("Send PingAck to {}", pingData.getFrom().address());
        this.transport.send(pingData.getFrom().address(), build).subscribe();
    }

    private void onPingReq(Message message) {
        LOGGER.trace("Received PingReq: {}", message);
        PingData pingData = (PingData) message.data();
        Member to = pingData.getTo();
        Member from = pingData.getFrom();
        Message build = Message.withData(new PingData(this.memberSupplier.get(), to, from)).qualifier(PING).correlationId(message.correlationId()).build();
        LOGGER.trace("Send transit Ping to {}", to.address());
        this.transport.send(to.address(), build).subscribe();
    }

    private void onTransitPingAck(Message message) {
        LOGGER.trace("Received transit PingAck: {}", message);
        PingData pingData = (PingData) message.data();
        Member originalIssuer = pingData.getOriginalIssuer();
        Message build = Message.withData(new PingData(originalIssuer, pingData.getTo())).qualifier(PING_ACK).correlationId(message.correlationId()).build();
        LOGGER.trace("Resend transit PingAck to {}", originalIssuer.address());
        this.transport.send(originalIssuer.address(), build).subscribe();
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    private void onMemberEvent(MembershipEvent membershipEvent) {
        int indexOf;
        Member member = membershipEvent.member();
        if (membershipEvent.isRemoved()) {
            this.pingMembers.removeIf(member2 -> {
                return member2.id().equals(member.id());
            });
        }
        if (membershipEvent.isAdded()) {
            int size = this.pingMembers.size();
            this.pingMembers.add(size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0, member);
        }
        if (!membershipEvent.isUpdated() || (indexOf = this.pingMembers.indexOf(membershipEvent.oldMember())) == -1) {
            return;
        }
        this.pingMembers.set(indexOf, membershipEvent.newMember());
    }

    private Member selectPingMember() {
        if (this.pingMembers.isEmpty()) {
            return null;
        }
        if (this.pingMemberIndex >= this.pingMembers.size()) {
            this.pingMemberIndex = 0;
            Collections.shuffle(this.pingMembers);
        }
        List<Member> list = this.pingMembers;
        int i = this.pingMemberIndex;
        this.pingMemberIndex = i + 1;
        return list.get(i);
    }

    private List<Member> selectPingReqMembers(Member member) {
        if (this.config.getPingReqMembers() <= 0) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(this.pingMembers);
        arrayList.remove(member);
        if (arrayList.isEmpty()) {
            return Collections.emptyList();
        }
        Collections.shuffle(arrayList);
        return arrayList.size() < this.config.getPingReqMembers() ? arrayList : arrayList.subList(0, this.config.getPingReqMembers());
    }

    private void publishPingResult(Member member, MemberStatus memberStatus) {
        LOGGER.debug("Member {} detected as {}", member, memberStatus);
        this.sink.next(new FailureDetectorEvent(member, memberStatus));
    }

    private boolean isPing(Message message) {
        return PING.equals(message.qualifier());
    }

    private boolean isPingReq(Message message) {
        return PING_REQ.equals(message.qualifier());
    }

    private boolean isPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData) message.data()).getOriginalIssuer() == null;
    }

    private boolean isTransitPingAck(Message message) {
        return PING_ACK.equals(message.qualifier()) && ((PingData) message.data()).getOriginalIssuer() != null;
    }

    Transport getTransport() {
        return this.transport;
    }
}
