package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.ResumeOkFrameFlyweight;
import java.time.Duration;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/resume/ServerRSocketSession.class */
public class ServerRSocketSession implements RSocketSession<DuplexConnection> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ServerRSocketSession.class);
    private final ResumableDuplexConnection resumableConnection;
    private final FluxProcessor<DuplexConnection, DuplexConnection> newConnections = ReplayProcessor.create(0);
    private final ByteBufAllocator allocator;
    private final ByteBuf resumeToken;

    public ServerRSocketSession(DuplexConnection duplexConnection, Duration duration, Duration duration2, Function<? super ByteBuf, ? extends ResumableFramesStore> function, ByteBuf byteBuf, boolean z) {
        this.allocator = duplexConnection.alloc();
        this.resumeToken = byteBuf;
        this.resumableConnection = new ResumableDuplexConnection("server", duplexConnection, function.apply(byteBuf), duration2, z);
        this.newConnections.mergeWith(this.resumableConnection.connectionErrors().flatMap(th -> {
            logger.debug("Starting session timeout due to error", th);
            return this.newConnections.next().doOnNext(duplexConnection2 -> {
                logger.debug("Connection after error: {}", duplexConnection2);
            }).timeout(duration);
        }).then().cast(DuplexConnection.class)).subscribe(duplexConnection2 -> {
            reconnect(duplexConnection2);
            logger.debug("Server ResumableConnection reconnected: {}", duplexConnection2);
        }, th2 -> {
            logger.debug("Server ResumableConnection reconnect timeout");
            this.resumableConnection.dispose();
        });
    }

    @Override // io.rsocket.resume.RSocketSession
    public ServerRSocketSession continueWith(DuplexConnection duplexConnection) {
        logger.debug("Server continued with connection: {}", duplexConnection);
        this.newConnections.onNext(duplexConnection);
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public ServerRSocketSession resumeWith(ByteBuf byteBuf) {
        logger.debug("Resume FRAME received");
        long remotePos = remotePos(byteBuf);
        long remoteImpliedPos = remoteImpliedPos(byteBuf);
        byteBuf.release();
        this.resumableConnection.resume(remotePos, remoteImpliedPos, mono -> {
            return mono.flatMap(l -> {
                return sendFrame(ResumeOkFrameFlyweight.encode(this.allocator, l.longValue()));
            }).onErrorResume(th -> {
                Mono<Void> sendFrame = sendFrame(ErrorFrameFlyweight.encode(this.allocator, 0, errorFrameThrowable(th)));
                ResumableDuplexConnection resumableDuplexConnection = this.resumableConnection;
                resumableDuplexConnection.getClass();
                return sendFrame.then(Mono.fromRunnable(resumableDuplexConnection::dispose)).then(Mono.never());
            });
        });
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public void reconnect(DuplexConnection duplexConnection) {
        this.resumableConnection.reconnect(duplexConnection);
    }

    @Override // io.rsocket.resume.RSocketSession
    public ResumableDuplexConnection resumableConnection() {
        return this.resumableConnection;
    }

    @Override // io.rsocket.resume.RSocketSession
    public ByteBuf token() {
        return this.resumeToken;
    }

    private Mono<Void> sendFrame(ByteBuf byteBuf) {
        logger.debug("Sending Resume frame: {}", byteBuf);
        return this.resumableConnection.sendOne(byteBuf).onErrorResume(th -> {
            return Mono.empty();
        });
    }

    private static long remotePos(ByteBuf byteBuf) {
        return ResumeFrameFlyweight.firstAvailableClientPos(byteBuf);
    }

    private static long remoteImpliedPos(ByteBuf byteBuf) {
        return ResumeFrameFlyweight.lastReceivedServerPos(byteBuf);
    }

    private static RejectedResumeException errorFrameThrowable(Throwable th) {
        String format;
        if (th instanceof ResumeStateException) {
            ResumeStateException resumeStateException = (ResumeStateException) th;
            format = String.format("resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", Long.valueOf(resumeStateException.getRemotePos()), Long.valueOf(resumeStateException.getRemoteImpliedPos()), Long.valueOf(resumeStateException.getLocalPos()), Long.valueOf(resumeStateException.getLocalImpliedPos()));
        } else {
            format = String.format("resume_internal_error: %s", th.getMessage());
        }
        return new RejectedResumeException(format);
    }
}
