package tech.ydb.coordination.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.proto.coordination.SessionRequest;
import tech.ydb.proto.coordination.SessionResponse;

/* loaded from: input_file:tech/ydb/coordination/impl/Stream.class */
class Stream implements GrpcReadStream.Observer<SessionResponse> {
    private static final int SHUTDOWN_TIMEOUT_MS = 1000;
    private static final Logger logger = LoggerFactory.getLogger(Stream.class);
    private final ScheduledExecutorService scheduler;
    private final GrpcReadWriteStream<SessionResponse, SessionRequest> stream;
    private final CompletableFuture<Status> stopFuture = new CompletableFuture<>();
    private final CompletableFuture<Result<Long>> startFuture = new CompletableFuture<>();
    private final Map<Long, StreamMsg<?>> messages = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Stream(Rpc rpc) {
        this.scheduler = rpc.getScheduler();
        this.stream = rpc.createSession(GrpcRequestSettings.newBuilder().build());
    }

    public CompletableFuture<Status> startStream() {
        this.stream.start(this).whenComplete((status, th) -> {
            if (th != null) {
                this.startFuture.completeExceptionally(th);
                this.stopFuture.completeExceptionally(th);
            }
            if (status != null) {
                this.startFuture.complete(Result.fail(status.isSuccess() ? Status.of(StatusCode.BAD_REQUEST) : status));
                this.stopFuture.complete(status);
            }
        });
        return this.stopFuture;
    }

    public Collection<StreamMsg<?>> getMessages() {
        return this.messages.values();
    }

    public void cancelStream() {
        logger.trace("stream {} cancel stream", Integer.valueOf(hashCode()));
        this.stream.cancel();
    }

    public CompletableFuture<Result<Long>> sendSessionStart(long j, String str, Duration duration, ByteString byteString) {
        SessionRequest build = SessionRequest.newBuilder().setSessionStart(SessionRequest.SessionStart.newBuilder().setSessionId(j).setPath(str).setTimeoutMillis(duration.toMillis()).setProtectionKey(byteString).build()).build();
        logger.trace("stream {} send session start msg {}", Integer.valueOf(hashCode()), Long.valueOf(j));
        this.stream.sendNext(build);
        return this.startFuture;
    }

    public CompletableFuture<Status> stop() {
        if (this.stopFuture.isDone()) {
            return this.stopFuture;
        }
        SessionRequest build = SessionRequest.newBuilder().setSessionStop(SessionRequest.SessionStop.newBuilder().build()).build();
        logger.trace("stream {} send session stop msg", Integer.valueOf(hashCode()));
        this.stream.sendNext(build);
        ScheduledFuture<?> schedule = this.scheduler.schedule(this::cancelStream, 1000L, TimeUnit.MILLISECONDS);
        this.stopFuture.whenComplete((status, th) -> {
            if (schedule.isDone()) {
                return;
            }
            schedule.cancel(true);
        });
        return this.stopFuture;
    }

    public void sendMsg(long j, StreamMsg<?> streamMsg) {
        StreamMsg<?> put = this.messages.put(Long.valueOf(j), streamMsg);
        SessionRequest makeRequest = streamMsg.makeRequest(j);
        logger.trace("stream {} send message {}", Integer.valueOf(hashCode()), TextFormat.shortDebugString(makeRequest));
        this.stream.sendNext(makeRequest);
        if (put != null) {
            put.handleError(Status.of(StatusCode.CLIENT_CANCELLED));
        }
    }

    public void onNext(SessionResponse sessionResponse) {
        if (sessionResponse.hasFailure()) {
            onFail(sessionResponse.getFailure());
            return;
        }
        if (sessionResponse.hasSessionStarted()) {
            onSessionStarted(sessionResponse.getSessionStarted());
            return;
        }
        if (sessionResponse.hasSessionStopped()) {
            onSessionStopped(sessionResponse.getSessionStopped());
            return;
        }
        if (sessionResponse.hasPing()) {
            onPing(sessionResponse.getPing());
            return;
        }
        if (sessionResponse.hasPong()) {
            logger.trace("stream {} got pong msg {}", Integer.valueOf(hashCode()), Long.toUnsignedString(sessionResponse.getPong().getOpaque()));
            return;
        }
        if (sessionResponse.hasAcquireSemaphorePending()) {
            logger.trace("stream {} got acquire semaphore pending msg {}", Integer.valueOf(hashCode()), Long.valueOf(sessionResponse.getAcquireSemaphorePending().getReqId()));
            return;
        }
        if (sessionResponse.hasCreateSemaphoreResult()) {
            onNextMessage(sessionResponse.getCreateSemaphoreResult().getReqId(), sessionResponse);
        }
        if (sessionResponse.hasDeleteSemaphoreResult()) {
            onNextMessage(sessionResponse.getDeleteSemaphoreResult().getReqId(), sessionResponse);
        }
        if (sessionResponse.hasUpdateSemaphoreResult()) {
            onNextMessage(sessionResponse.getUpdateSemaphoreResult().getReqId(), sessionResponse);
        }
        if (sessionResponse.hasDescribeSemaphoreResult()) {
            onNextMessage(sessionResponse.getDescribeSemaphoreResult().getReqId(), sessionResponse);
        }
        if (sessionResponse.hasAcquireSemaphoreResult()) {
            onNextMessage(sessionResponse.getAcquireSemaphoreResult().getReqId(), sessionResponse);
        }
        if (sessionResponse.hasReleaseSemaphoreResult()) {
            onNextMessage(sessionResponse.getReleaseSemaphoreResult().getReqId(), sessionResponse);
        }
        if (sessionResponse.hasDescribeSemaphoreChanged()) {
            onNextMessage(sessionResponse.getDescribeSemaphoreChanged().getReqId(), sessionResponse);
        }
    }

    public void onNextMessage(long j, SessionResponse sessionResponse) {
        StreamMsg<?> put;
        StreamMsg<?> remove = this.messages.remove(Long.valueOf(j));
        if (remove == null || !remove.handleResponse(sessionResponse)) {
            logger.warn("stream {} lost response {}", Integer.valueOf(hashCode()), TextFormat.shortDebugString(sessionResponse));
            return;
        }
        logger.trace("stream {} got response {}", Integer.valueOf(hashCode()), TextFormat.shortDebugString(sessionResponse));
        StreamMsg<?> nextMsg = remove.nextMsg();
        if (nextMsg == null || (put = this.messages.put(Long.valueOf(j), nextMsg)) == null) {
            return;
        }
        put.handleError(Status.of(StatusCode.CLIENT_CANCELLED));
    }

    private void onFail(SessionResponse.Failure failure) {
        Status of = Status.of(StatusCode.fromProto(failure.getStatus()), Issue.fromPb(failure.getIssuesList()));
        logger.trace("stream {} got fail message {}", Integer.valueOf(hashCode()), of);
        this.stopFuture.complete(of);
        this.startFuture.complete(Result.fail(of));
    }

    private void onSessionStarted(SessionResponse.SessionStarted sessionStarted) {
        long sessionId = sessionStarted.getSessionId();
        if (this.startFuture.complete(Result.success(Long.valueOf(sessionId)))) {
            logger.trace("stream {} started with id {}", Integer.valueOf(hashCode()), Long.valueOf(sessionId));
        } else {
            logger.warn("stream {} lost the start message with id {}", Integer.valueOf(hashCode()), Long.valueOf(sessionId));
        }
    }

    private void onSessionStopped(SessionResponse.SessionStopped sessionStopped) {
        logger.trace("stream {} stopped with id {}", Integer.valueOf(hashCode()), Long.valueOf(sessionStopped.getSessionId()));
        this.stream.close();
    }

    private void onPing(SessionResponse.PingPong pingPong) {
        long opaque = pingPong.getOpaque();
        SessionRequest build = SessionRequest.newBuilder().setPong(SessionRequest.PingPong.newBuilder().setOpaque(opaque).build()).build();
        logger.trace("stream {} got ping msg {}, sending pong msg", Integer.valueOf(hashCode()), Long.toUnsignedString(opaque));
        this.stream.sendNext(build);
    }
}
