package io.streamnative.oxia.client.session;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubProvider;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.proto.CloseSessionRequest;
import io.streamnative.oxia.proto.CloseSessionResponse;
import io.streamnative.oxia.proto.KeepAliveResponse;
import io.streamnative.oxia.proto.SessionHeartbeat;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/oxia-client-0.3.2.jar:io/streamnative/oxia/client/session/Session.class */
public class Session implements StreamObserver<KeepAliveResponse> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Session.class);

    @NonNull
    private final OxiaStubProvider stubProvider;

    @NonNull
    private final Duration sessionTimeout;

    @NonNull
    private final Duration heartbeatInterval;

    @VisibleForTesting
    private final long shardId;
    private final long sessionId;
    private final String clientIdentifier;

    @NonNull
    private final SessionHeartbeat heartbeat;

    @NonNull
    private final SessionNotificationListener listener;
    private volatile boolean closed;
    private Counter sessionsOpened;
    private Counter sessionsExpired;
    private Counter sessionsClosed;
    private final ScheduledFuture<?> heartbeatFuture;
    private volatile Instant lastSuccessfullResponse;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session(@NonNull ScheduledExecutorService scheduledExecutorService, @NonNull OxiaStubProvider oxiaStubProvider, @NonNull ClientConfig clientConfig, long j, long j2, InstrumentProvider instrumentProvider, SessionNotificationListener sessionNotificationListener) {
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (oxiaStubProvider == null) {
            throw new NullPointerException("stubProvider is marked non-null but is null");
        }
        if (clientConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        this.stubProvider = oxiaStubProvider;
        this.sessionTimeout = clientConfig.sessionTimeout();
        this.heartbeatInterval = Duration.ofMillis(Math.max(clientConfig.sessionTimeout().toMillis() / 10, Duration.ofSeconds(2L).toMillis()));
        this.shardId = j;
        this.sessionId = j2;
        this.clientIdentifier = clientConfig.clientIdentifier();
        this.heartbeat = SessionHeartbeat.newBuilder().setShardId(j).setSessionId(j2).build();
        this.listener = sessionNotificationListener;
        log.info("Session created shard={} sessionId={} clientIdentity={}", Long.valueOf(j), Long.valueOf(j2), clientConfig.clientIdentifier());
        this.sessionsOpened = instrumentProvider.newCounter("oxia.client.sessions.opened", Unit.Sessions, "The total number of sessions opened by this client", Attributes.builder().put("oxia.shard", j).build());
        this.sessionsExpired = instrumentProvider.newCounter("oxia.client.sessions.expired", Unit.Sessions, "The total number of sessions expired int this client", Attributes.builder().put("oxia.shard", j).build());
        this.sessionsClosed = instrumentProvider.newCounter("oxia.client.sessions.closed", Unit.Sessions, "The total number of sessions closed by this client", Attributes.builder().put("oxia.shard", j).build());
        this.sessionsOpened.increment();
        this.lastSuccessfullResponse = Instant.now();
        this.heartbeatFuture = scheduledExecutorService.scheduleAtFixedRate(this::sendKeepAlive, this.heartbeatInterval.toMillis(), this.heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void sendKeepAlive() {
        if (Duration.between(this.lastSuccessfullResponse, Instant.now()).toMillis() > this.sessionTimeout.toMillis()) {
            handleSessionExpired();
        } else {
            this.stubProvider.getStubForShard(this.shardId).async().keepAlive(this.heartbeat, this);
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(KeepAliveResponse keepAliveResponse) {
        this.lastSuccessfullResponse = Instant.now();
        if (log.isDebugEnabled()) {
            log.debug("Received keep-alive response shard={} sessionId={} clientIdentity={}", Long.valueOf(this.shardId), Long.valueOf(this.sessionId), this.clientIdentifier);
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        log.warn("Error during session keep-alive shard={} sessionId={} clientIdentity={}: {}", Long.valueOf(this.shardId), Long.valueOf(this.sessionId), this.clientIdentifier, th.getMessage());
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
    }

    private void handleSessionExpired() {
        this.sessionsExpired.increment();
        log.warn("Session expired shard={} sessionId={} clientIdentity={}", Long.valueOf(this.shardId), Long.valueOf(this.sessionId), this.clientIdentifier);
        close();
    }

    public CompletableFuture<Void> close() {
        this.sessionsClosed.increment();
        this.heartbeatFuture.cancel(true);
        OxiaStub stubForShard = this.stubProvider.getStubForShard(this.shardId);
        CloseSessionRequest build = CloseSessionRequest.newBuilder().setShardId(this.shardId).setSessionId(this.sessionId).build();
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        stubForShard.async().closeSession(build, new StreamObserver<CloseSessionResponse>() { // from class: io.streamnative.oxia.client.session.Session.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(CloseSessionResponse closeSessionResponse) {
                Session.log.info("Session closed shard={} sessionId={} clientIdentity={}", Long.valueOf(Session.this.shardId), Long.valueOf(Session.this.sessionId), Session.this.clientIdentifier);
                Session.this.listener.onSessionClosed(Session.this);
                completableFuture.complete(null);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                Session.this.listener.onSessionClosed(Session.this);
                completableFuture.complete(null);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getShardId() {
        return this.shardId;
    }

    public long getSessionId() {
        return this.sessionId;
    }
}
