package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ServerRSocketSession;
import io.rsocket.resume.SessionManager;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.2.jar:io/rsocket/core/ServerSetup.class */
public abstract class ServerSetup {
    final Duration timeout;

    /* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.2.jar:io/rsocket/core/ServerSetup$DefaultServerSetup.class */
    static class DefaultServerSetup extends ServerSetup {
        /* JADX INFO: Access modifiers changed from: package-private */
        public DefaultServerSetup(Duration duration) {
            super(duration);
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketSetup(ByteBuf byteBuf, DuplexConnection duplexConnection, BiFunction<KeepAliveHandler, DuplexConnection, Mono<Void>> biFunction) {
            if (!SetupFrameCodec.resumeEnabled(byteBuf)) {
                return biFunction.apply(new KeepAliveHandler.DefaultKeepAliveHandler(duplexConnection), duplexConnection);
            }
            sendError(duplexConnection, new UnsupportedSetupException("resume not supported"));
            return duplexConnection.onClose();
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketResume(ByteBuf byteBuf, DuplexConnection duplexConnection) {
            sendError(duplexConnection, new RejectedResumeException("resume not supported"));
            return duplexConnection.onClose();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.2.jar:io/rsocket/core/ServerSetup$ResumableServerSetup.class */
    static class ResumableServerSetup extends ServerSetup {
        private final SessionManager sessionManager;
        private final Duration resumeSessionDuration;
        private final Duration resumeStreamTimeout;
        private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
        private final boolean cleanupStoreOnKeepAlive;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ResumableServerSetup(Duration duration, SessionManager sessionManager, Duration duration2, Duration duration3, Function<? super ByteBuf, ? extends ResumableFramesStore> function, boolean z) {
            super(duration);
            this.sessionManager = sessionManager;
            this.resumeSessionDuration = duration2;
            this.resumeStreamTimeout = duration3;
            this.resumeStoreFactory = function;
            this.cleanupStoreOnKeepAlive = z;
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketSetup(ByteBuf byteBuf, DuplexConnection duplexConnection, BiFunction<KeepAliveHandler, DuplexConnection, Mono<Void>> biFunction) {
            if (!SetupFrameCodec.resumeEnabled(byteBuf)) {
                return biFunction.apply(new KeepAliveHandler.DefaultKeepAliveHandler(duplexConnection), duplexConnection);
            }
            ByteBuf resumeToken = SetupFrameCodec.resumeToken(byteBuf);
            ResumableFramesStore apply = this.resumeStoreFactory.apply(resumeToken);
            ResumableDuplexConnection resumableDuplexConnection = new ResumableDuplexConnection("server", resumeToken, duplexConnection, apply);
            ServerRSocketSession serverRSocketSession = new ServerRSocketSession(resumeToken, resumableDuplexConnection, duplexConnection, apply, this.resumeSessionDuration, this.cleanupStoreOnKeepAlive);
            this.sessionManager.save(serverRSocketSession, resumeToken);
            return biFunction.apply(new KeepAliveHandler.ResumableKeepAliveHandler(resumableDuplexConnection, serverRSocketSession, serverRSocketSession), resumableDuplexConnection);
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketResume(ByteBuf byteBuf, DuplexConnection duplexConnection) {
            ServerRSocketSession serverRSocketSession = this.sessionManager.get(ResumeFrameCodec.token(byteBuf));
            if (serverRSocketSession != null) {
                serverRSocketSession.resumeWith(byteBuf, duplexConnection);
                return duplexConnection.onClose();
            }
            sendError(duplexConnection, new RejectedResumeException("unknown resume token"));
            return duplexConnection.onClose();
        }

        @Override // io.rsocket.core.ServerSetup
        public void dispose() {
            this.sessionManager.dispose();
        }
    }

    protected ServerSetup(Duration duration) {
        this.timeout = duration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection duplexConnection) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                new SetupHandlingDuplexConnection(duplexConnection, monoSink);
            });
        }).timeout(this.timeout).or(duplexConnection.onClose().then(Mono.error((Supplier<? extends Throwable>) ClosedChannelException::new)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Mono<Void> acceptRSocketSetup(ByteBuf byteBuf, DuplexConnection duplexConnection, BiFunction<KeepAliveHandler, DuplexConnection, Mono<Void>> biFunction);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Mono<Void> acceptRSocketResume(ByteBuf byteBuf, DuplexConnection duplexConnection);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendError(DuplexConnection duplexConnection, RSocketErrorException rSocketErrorException) {
        duplexConnection.sendErrorAndClose(rSocketErrorException);
    }
}
