package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ServerRSocketSession;
import io.rsocket.resume.SessionManager;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/core/ServerSetup.class */
public abstract class ServerSetup {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/core/ServerSetup$DefaultServerSetup.class */
    public static class DefaultServerSetup extends ServerSetup {
        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketSetup(ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> biFunction) {
            return SetupFrameFlyweight.resumeEnabled(byteBuf) ? sendError(clientServerInputMultiplexer, new UnsupportedSetupException("resume not supported")).doFinally(signalType -> {
                byteBuf.release();
                clientServerInputMultiplexer.dispose();
            }) : biFunction.apply(new KeepAliveHandler.DefaultKeepAliveHandler(clientServerInputMultiplexer), clientServerInputMultiplexer);
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketResume(ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer) {
            return sendError(clientServerInputMultiplexer, new RejectedResumeException("resume not supported")).doFinally(signalType -> {
                byteBuf.release();
                clientServerInputMultiplexer.dispose();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.0.0-RC7.jar:io/rsocket/core/ServerSetup$ResumableServerSetup.class */
    public static class ResumableServerSetup extends ServerSetup {
        private final SessionManager sessionManager;
        private final Duration resumeSessionDuration;
        private final Duration resumeStreamTimeout;
        private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
        private final boolean cleanupStoreOnKeepAlive;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ResumableServerSetup(SessionManager sessionManager, Duration duration, Duration duration2, Function<? super ByteBuf, ? extends ResumableFramesStore> function, boolean z) {
            this.sessionManager = sessionManager;
            this.resumeSessionDuration = duration;
            this.resumeStreamTimeout = duration2;
            this.resumeStoreFactory = function;
            this.cleanupStoreOnKeepAlive = z;
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketSetup(ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> biFunction) {
            if (!SetupFrameFlyweight.resumeEnabled(byteBuf)) {
                return biFunction.apply(new KeepAliveHandler.DefaultKeepAliveHandler(clientServerInputMultiplexer), clientServerInputMultiplexer);
            }
            ResumableDuplexConnection resumableConnection = this.sessionManager.save(new ServerRSocketSession(clientServerInputMultiplexer.asClientServerConnection(), this.resumeSessionDuration, this.resumeStreamTimeout, this.resumeStoreFactory, SetupFrameFlyweight.resumeToken(byteBuf), this.cleanupStoreOnKeepAlive)).resumableConnection();
            return biFunction.apply(new KeepAliveHandler.ResumableKeepAliveHandler(resumableConnection), new ClientServerInputMultiplexer(resumableConnection));
        }

        @Override // io.rsocket.core.ServerSetup
        public Mono<Void> acceptRSocketResume(ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer) {
            ServerRSocketSession serverRSocketSession = this.sessionManager.get(ResumeFrameFlyweight.token(byteBuf));
            return serverRSocketSession != null ? serverRSocketSession.continueWith(clientServerInputMultiplexer.asClientServerConnection()).resumeWith(byteBuf).onClose().then() : sendError(clientServerInputMultiplexer, new RejectedResumeException("unknown resume token")).doFinally(signalType -> {
                byteBuf.release();
                clientServerInputMultiplexer.dispose();
            });
        }

        @Override // io.rsocket.core.ServerSetup
        public void dispose() {
            this.sessionManager.dispose();
        }
    }

    ServerSetup() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Mono<Void> acceptRSocketSetup(ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> biFunction);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Mono<Void> acceptRSocketResume(ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> sendError(ClientServerInputMultiplexer clientServerInputMultiplexer, Exception exc) {
        DuplexConnection asSetupConnection = clientServerInputMultiplexer.asSetupConnection();
        return asSetupConnection.sendOne(ErrorFrameFlyweight.encode(asSetupConnection.alloc(), 0, exc)).onErrorResume(th -> {
            return Mono.empty();
        });
    }
}
