package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.ResumeOkFrameFlyweight;
import io.rsocket.internal.ClientServerInputMultiplexer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/resume/ClientRSocketSession.class */
public class ClientRSocketSession implements RSocketSession<Mono<DuplexConnection>> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ClientRSocketSession.class);
    private final ResumableDuplexConnection resumableConnection;
    private volatile Mono<DuplexConnection> newConnection;
    private volatile ByteBuf resumeToken;
    private final ByteBufAllocator allocator;

    /* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/resume/ClientRSocketSession$RetrySignal.class */
    private static class RetrySignal implements Retry.RetrySignal {
        private final Throwable ex;

        RetrySignal(Throwable th) {
            this.ex = th;
        }

        @Override // reactor.util.retry.Retry.RetrySignal
        public long totalRetries() {
            return 0L;
        }

        @Override // reactor.util.retry.Retry.RetrySignal
        public long totalRetriesInARow() {
            return 0L;
        }

        @Override // reactor.util.retry.Retry.RetrySignal
        public Throwable failure() {
            return this.ex;
        }
    }

    public ClientRSocketSession(DuplexConnection duplexConnection, Duration duration, Retry retry, ResumableFramesStore resumableFramesStore, Duration duration2, boolean z) {
        this.allocator = duplexConnection.alloc();
        this.resumableConnection = new ResumableDuplexConnection("client", duplexConnection, resumableFramesStore, duration2, z);
        onClose().doFinally(signalType -> {
            this.resumeToken.release();
        }).subscribe();
        this.resumableConnection.connectionErrors().flatMap(th -> {
            logger.debug("Client session connection error. Starting new connection");
            return this.newConnection.delaySubscription(new AtomicBoolean().compareAndSet(false, true) ? retry.generateCompanion(Flux.just(new RetrySignal(th))) : Mono.empty()).retryWhen(retry).timeout(duration);
        }).map(ClientServerInputMultiplexer::new).subscribe(clientServerInputMultiplexer -> {
            reconnect(clientServerInputMultiplexer.asClientServerConnection());
            long impliedPosition = this.resumableConnection.impliedPosition();
            long position = this.resumableConnection.position();
            logger.debug("Client ResumableConnection reconnected. Sending RESUME frame with state: [impliedPos: {}, pos: {}]", Long.valueOf(impliedPosition), Long.valueOf(position));
            sendFrame(ResumeFrameFlyweight.encode(this.allocator, this.resumeToken.retain(), impliedPosition, position)).then(clientServerInputMultiplexer.asSetupConnection().receive().next()).subscribe((Consumer<? super V>) this::resumeWith);
        }, th2 -> {
            logger.debug("Client ResumableConnection reconnect timeout");
            this.resumableConnection.dispose();
        });
    }

    @Override // io.rsocket.resume.RSocketSession
    public ClientRSocketSession continueWith(Mono<DuplexConnection> mono) {
        this.newConnection = mono;
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public ClientRSocketSession resumeWith(ByteBuf byteBuf) {
        logger.debug("ResumeOK FRAME received");
        long remotePos = remotePos(byteBuf);
        long remoteImpliedPos = remoteImpliedPos(byteBuf);
        byteBuf.release();
        this.resumableConnection.resume(remotePos, remoteImpliedPos, mono -> {
            return mono.then().onErrorResume(th -> {
                Mono<Void> sendFrame = sendFrame(ErrorFrameFlyweight.encode(this.allocator, 0, errorFrameThrowable(remoteImpliedPos)));
                ResumableDuplexConnection resumableDuplexConnection = this.resumableConnection;
                resumableDuplexConnection.getClass();
                return sendFrame.then(Mono.fromRunnable(resumableDuplexConnection::dispose)).then(Mono.never());
            });
        });
        return this;
    }

    public ClientRSocketSession resumeToken(ByteBuf byteBuf) {
        this.resumeToken = byteBuf.retain();
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public void reconnect(DuplexConnection duplexConnection) {
        this.resumableConnection.reconnect(duplexConnection);
    }

    @Override // io.rsocket.resume.RSocketSession
    public ResumableDuplexConnection resumableConnection() {
        return this.resumableConnection;
    }

    @Override // io.rsocket.resume.RSocketSession
    public ByteBuf token() {
        return this.resumeToken;
    }

    private Mono<Void> sendFrame(ByteBuf byteBuf) {
        return this.resumableConnection.sendOne(byteBuf).onErrorResume(th -> {
            return Mono.empty();
        });
    }

    private static long remoteImpliedPos(ByteBuf byteBuf) {
        return ResumeOkFrameFlyweight.lastReceivedClientPos(byteBuf);
    }

    private static long remotePos(ByteBuf byteBuf) {
        return -1L;
    }

    private static ConnectionErrorException errorFrameThrowable(long j) {
        return new ConnectionErrorException("resumption_server_pos=[" + j + "]");
    }
}
