package io.scalecube.services.gateway.ws;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaders;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.ForbiddenException;
import io.scalecube.services.exceptions.InternalServiceException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.gateway.GatewaySessionHandler;
import io.scalecube.services.gateway.ReferenceCountUtil;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import reactor.util.context.Context;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.class */
public class WebsocketGatewayAcceptor implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
    private static final int DEFAULT_ERROR_CODE = 500;
    private static final AtomicLong SESSION_ID_GENERATOR = new AtomicLong(System.currentTimeMillis());
    private final WebsocketServiceMessageCodec messageCodec = new WebsocketServiceMessageCodec();
    private final ServiceCall serviceCall;
    private final GatewaySessionHandler gatewayHandler;
    private final ServiceProviderErrorMapper errorMapper;

    public WebsocketGatewayAcceptor(ServiceCall serviceCall, GatewaySessionHandler gatewaySessionHandler, ServiceProviderErrorMapper serviceProviderErrorMapper) {
        this.serviceCall = (ServiceCall) Objects.requireNonNull(serviceCall, "serviceCall");
        this.gatewayHandler = (GatewaySessionHandler) Objects.requireNonNull(gatewaySessionHandler, "gatewayHandler");
        this.errorMapper = (ServiceProviderErrorMapper) Objects.requireNonNull(serviceProviderErrorMapper, "errorMapper");
    }

    @Override // java.util.function.BiFunction
    public Publisher<Void> apply(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        Map<String, String> computeHeaders = computeHeaders(httpServerRequest.requestHeaders());
        long incrementAndGet = SESSION_ID_GENERATOR.incrementAndGet();
        return this.gatewayHandler.onConnectionOpen(incrementAndGet, computeHeaders).doOnError(th -> {
            httpServerResponse.status(toStatusCode(th)).send().subscribe();
        }).then(Mono.defer(() -> {
            return httpServerResponse.sendWebsocket((websocketInbound, websocketOutbound) -> {
                return onConnect(new WebsocketGatewaySession(incrementAndGet, this.messageCodec, computeHeaders, websocketInbound, websocketOutbound, this.gatewayHandler));
            });
        })).onErrorResume(th2 -> {
            return Mono.empty();
        });
    }

    private static Map<String, String> computeHeaders(HttpHeaders httpHeaders) {
        return (Map) httpHeaders.entries().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static int toStatusCode(Throwable th) {
        int i = DEFAULT_ERROR_CODE;
        if (th instanceof ServiceException) {
            if (th instanceof BadRequestException) {
                i = 400;
            } else if (th instanceof UnauthorizedException) {
                i = 401;
            } else if (th instanceof ForbiddenException) {
                i = 403;
            } else if (th instanceof ServiceUnavailableException) {
                i = 503;
            } else if (th instanceof InternalServiceException) {
                i = DEFAULT_ERROR_CODE;
            }
        }
        return i;
    }

    private Mono<Void> onConnect(WebsocketGatewaySession websocketGatewaySession) {
        this.gatewayHandler.onSessionOpen(websocketGatewaySession);
        websocketGatewaySession.receive().doOnError(th -> {
            this.gatewayHandler.onSessionError(websocketGatewaySession, th);
        }).subscribe(byteBuf -> {
            if (byteBuf.isReadable()) {
                Mono.deferWithContext(context -> {
                    return onRequest(websocketGatewaySession, byteBuf, context);
                }).subscriberContext(context2 -> {
                    return this.gatewayHandler.onRequest(websocketGatewaySession, byteBuf, context2);
                }).subscribe();
            } else {
                ReferenceCountUtil.safestRelease(byteBuf);
            }
        });
        return websocketGatewaySession.onClose(() -> {
            this.gatewayHandler.onSessionClose(websocketGatewaySession);
        });
    }

    private Mono<ServiceMessage> onRequest(WebsocketGatewaySession websocketGatewaySession, ByteBuf byteBuf, Context context) {
        return Mono.fromCallable(() -> {
            return this.messageCodec.decode(byteBuf);
        }).map(GatewayMessages::validateSid).flatMap(serviceMessage -> {
            return onCancel(websocketGatewaySession, serviceMessage);
        }).map(obj -> {
            return GatewayMessages.validateSidOnSession(websocketGatewaySession, (ServiceMessage) obj);
        }).map(GatewayMessages::validateQualifier).map(serviceMessage2 -> {
            return this.gatewayHandler.mapMessage(websocketGatewaySession, serviceMessage2, context);
        }).doOnNext(serviceMessage3 -> {
            onRequest(websocketGatewaySession, serviceMessage3, context);
        }).doOnError(th -> {
            if (!(th instanceof WebsocketContextException)) {
                this.gatewayHandler.onError(websocketGatewaySession, th, context);
                return;
            }
            WebsocketContextException websocketContextException = (WebsocketContextException) th;
            websocketContextException.releaseRequest();
            websocketGatewaySession.send(GatewayMessages.toErrorResponse(this.errorMapper, websocketContextException.request(), websocketContextException.getCause())).subscriberContext(context).subscribe();
        });
    }

    private void onRequest(WebsocketGatewaySession websocketGatewaySession, ServiceMessage serviceMessage, Context context) {
        long sid = GatewayMessages.getSid(serviceMessage);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Flux requestMany = this.serviceCall.requestMany(serviceMessage);
        Optional map = Optional.ofNullable(serviceMessage.header("rlimit")).map(Integer::valueOf);
        requestMany.getClass();
        Flux map2 = ((Flux) map.map((v1) -> {
            return r1.limitRate(v1);
        }).orElse(requestMany)).map(serviceMessage2 -> {
            boolean z = false;
            if (serviceMessage2.isError()) {
                atomicBoolean.set(true);
                z = true;
            }
            return GatewayMessages.newResponseMessage(sid, serviceMessage2, z);
        });
        websocketGatewaySession.getClass();
        websocketGatewaySession.register(Long.valueOf(sid), map2.flatMap(websocketGatewaySession::send).doOnError(th -> {
            ReferenceCountUtil.safestRelease(serviceMessage.data());
        }).doOnError(th2 -> {
            websocketGatewaySession.send(GatewayMessages.toErrorResponse(this.errorMapper, serviceMessage, th2)).subscriberContext(context).subscribe();
        }).doOnComplete(() -> {
            if (atomicBoolean.get()) {
                return;
            }
            websocketGatewaySession.send(GatewayMessages.newCompleteMessage(sid, serviceMessage.qualifier())).subscriberContext(context).subscribe();
        }).doFinally(signalType -> {
            websocketGatewaySession.dispose(Long.valueOf(sid));
        }).subscriberContext(context).subscribe());
    }

    private Mono<?> onCancel(WebsocketGatewaySession websocketGatewaySession, ServiceMessage serviceMessage) {
        if (GatewayMessages.getSignal(serviceMessage) != Signal.CANCEL) {
            return Mono.just(serviceMessage);
        }
        Optional.ofNullable(serviceMessage.data()).ifPresent(ReferenceCountUtil::safestRelease);
        long sid = GatewayMessages.getSid(serviceMessage);
        websocketGatewaySession.dispose(Long.valueOf(sid));
        return websocketGatewaySession.send(GatewayMessages.newCancelMessage(sid, serviceMessage.qualifier()));
    }
}
