package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.control.Heartbeat;
import io.axoniq.axonserver.grpc.control.PlatformInboundInstruction;
import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/HeartbeatMonitor.class */
public class HeartbeatMonitor {
    private static final Logger logger = LoggerFactory.getLogger(HeartbeatMonitor.class);
    private static final PlatformInboundInstruction HEARTBEAT_MESSAGE = PlatformInboundInstruction.newBuilder().setHeartbeat(Heartbeat.getDefaultInstance()).m3092build();
    static Clock clock = Clock.systemUTC();
    private final ScheduledExecutorService executor;
    private final HeartbeatSender sender;
    private final Runnable onConnectionCorrupted;
    private final AtomicLong nextHeartbeatDeadline = new AtomicLong();
    private final AtomicLong nextHeartbeat = new AtomicLong();
    private final AtomicLong timeout = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong interval = new AtomicLong(Long.MAX_VALUE);
    private final AtomicInteger taskId = new AtomicInteger();

    public HeartbeatMonitor(ScheduledExecutorService scheduledExecutorService, HeartbeatSender heartbeatSender, Runnable runnable) {
        this.executor = scheduledExecutorService;
        this.sender = heartbeatSender;
        this.onConnectionCorrupted = runnable;
    }

    public void enableHeartbeat(long j, long j2, TimeUnit timeUnit) {
        this.interval.set(timeUnit.toMillis(j));
        this.timeout.set(timeUnit.toMillis(j2));
        long millis = clock.millis();
        this.nextHeartbeat.set(millis);
        this.nextHeartbeatDeadline.set(millis + timeUnit.toMillis(j2));
        int incrementAndGet = this.taskId.incrementAndGet();
        this.executor.execute(() -> {
            checkAndReschedule(incrementAndGet);
        });
    }

    public void disableHeartbeat() {
        this.interval.set(Long.MAX_VALUE);
        this.nextHeartbeatDeadline.set(Long.MAX_VALUE);
        this.taskId.incrementAndGet();
    }

    private void checkAndReschedule(int i) {
        if (i != this.taskId.get()) {
            return;
        }
        checkBeatDeadline();
        sendBeatIfTimeElapsed();
        long min = Math.min(this.interval.get(), 1000L);
        debug("Scheduling next heartbeat verification in {}ms", Long.valueOf(min));
        this.executor.schedule(() -> {
            checkAndReschedule(i);
        }, min, TimeUnit.MILLISECONDS);
    }

    public void pause() {
        long j = this.interval.get();
        long j2 = this.timeout.get();
        if (j == Long.MAX_VALUE && j2 == Long.MAX_VALUE) {
            return;
        }
        this.taskId.incrementAndGet();
    }

    public void resume() {
        long j = this.interval.get();
        long j2 = this.timeout.get();
        if (j == Long.MAX_VALUE || j2 == Long.MAX_VALUE) {
            return;
        }
        enableHeartbeat(j, j2, TimeUnit.MILLISECONDS);
    }

    private void sendBeatIfTimeElapsed() {
        if (shouldSendBeat()) {
            debug("Sending heartbeat due to elapsed next beat interval.", new Object[0]);
            try {
                this.sender.sendHeartbeat().whenComplete((r4, th) -> {
                    handleHeartbeatCallResult(th);
                });
                debug("Next heartbeat has been planned for {} due to sent heartbeat", extendNextHeartbeatTime());
            } catch (Exception e) {
                logger.warn("Was unable to send heartbeat due to exception", e);
            }
        }
    }

    private void debug(String str, Object... objArr) {
        if (logger.isDebugEnabled()) {
            logger.debug(str, objArr);
        }
    }

    private void checkBeatDeadline() {
        if (this.nextHeartbeatDeadline.get() <= clock.millis()) {
            logger.info("Did not receive heartbeat acknowledgement within {}ms", Long.valueOf(this.timeout.get()));
            this.onConnectionCorrupted.run();
            extendHeartbeatDeadline();
        }
    }

    private void handleHeartbeatCallResult(Throwable th) {
        if (!(th == null || isUnsupportedInstructionError(th))) {
            debug("Heartbeat call resulted in an error.", th);
            this.onConnectionCorrupted.run();
        } else {
            if (this.interval.get() == Long.MAX_VALUE) {
                debug("Heartbeat Acknowledgment received but heartbeats were disabled.", new Object[0]);
            } else {
                debug("Heartbeat call succeeded and extended deadline to {}", extendHeartbeatDeadline());
            }
        }
    }

    private boolean isUnsupportedInstructionError(Throwable th) {
        return (th instanceof AxonServerException) && ((AxonServerException) th).getErrorCategory() == ErrorCategory.UNSUPPORTED_INSTRUCTION;
    }

    private boolean shouldSendBeat() {
        return this.nextHeartbeat.get() <= clock.millis();
    }

    private Instant extendNextHeartbeatTime() {
        long millis = clock.millis();
        return Instant.ofEpochMilli(this.nextHeartbeat.updateAndGet(j -> {
            return millis + this.interval.get();
        }));
    }

    private Instant extendHeartbeatDeadline() {
        long millis = clock.millis();
        return Instant.ofEpochMilli(this.nextHeartbeatDeadline.updateAndGet(j -> {
            return Math.max(millis + this.timeout.get() + this.interval.get(), j);
        }));
    }

    public void handleIncomingBeat(ReplyChannel<PlatformInboundInstruction> replyChannel) {
        debug("Received heartbeat call from server, extending deadline to {} and planned next heartbeat for {}", extendHeartbeatDeadline(), extendNextHeartbeatTime());
        try {
            replyChannel.send(HEARTBEAT_MESSAGE);
            replyChannel.complete();
        } catch (Throwable th) {
            replyChannel.complete();
            throw th;
        }
    }
}
