package dev.snowdrop.vertx.http.common;

import dev.snowdrop.vertx.http.common.WriteStreamSubscriber;
import dev.snowdrop.vertx.http.utils.BufferConverter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketBase;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/vertx-spring-boot-starter-http-1.4.0.Beta1.jar:dev/snowdrop/vertx/http/common/VertxWebSocketSession.class */
public class VertxWebSocketSession extends AbstractWebSocketSession<WebSocketBase> {
    private final Logger logger;
    private final BufferConverter bufferConverter;
    private final int requestLimit;

    public VertxWebSocketSession(WebSocketBase webSocketBase, HandshakeInfo handshakeInfo, BufferConverter bufferConverter, int i, int i2) {
        super(webSocketBase, ObjectUtils.getIdentityHexString(webSocketBase), handshakeInfo, bufferConverter.getDataBufferFactory());
        this.logger = LoggerFactory.getLogger(getClass());
        this.bufferConverter = bufferConverter;
        if (i2 < 1 || i < 1) {
            throw new IllegalArgumentException("Max web socket frame and message sizes cannot be less than 1");
        }
        this.requestLimit = (i2 / i) + 1;
    }

    @Override // org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession, org.springframework.web.reactive.socket.WebSocketSession
    public Flux<WebSocketMessage> receive() {
        return Flux.create(fluxSink -> {
            this.logger.debug("{}Connecting to a web socket read stream", getLogPrefix());
            WebSocketBase delegate = getDelegate();
            delegate.pause2().textMessageHandler(str -> {
                this.logger.debug("{}Received text '{}' from a web socket read stream", getLogPrefix(), str);
                fluxSink.next(textMessage(str));
            }).binaryMessageHandler(buffer -> {
                this.logger.debug("{}Received binary '{}' from a web socket read stream", getLogPrefix(), buffer);
                fluxSink.next(binaryMessage(buffer));
            }).pongHandler(buffer2 -> {
                this.logger.debug("{}Received pong '{}' from a web socket read stream", getLogPrefix(), buffer2);
                fluxSink.next(pongMessage(buffer2));
            }).exceptionHandler(th -> {
                this.logger.debug("{}Received exception '{}' from a web socket read stream", getLogPrefix(), th);
                fluxSink.error(th);
            }).endHandler(r6 -> {
                this.logger.debug("{}Web socket read stream ended", getLogPrefix());
                fluxSink.complete();
            });
            fluxSink.onRequest(j -> {
                this.logger.debug("{}Fetching '{}' entries from a web socket read stream", getLogPrefix(), Long.valueOf(j));
                delegate.fetch2(j);
            });
        });
    }

    @Override // org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession, org.springframework.web.reactive.socket.WebSocketSession
    public Mono<Void> send(Publisher<WebSocketMessage> publisher) {
        return Mono.create(monoSink -> {
            this.logger.debug("{}Subscribing to messages publisher", getLogPrefix());
            publisher.subscribe(new WriteStreamSubscriber.Builder().writeStream(getDelegate()).nextHandler(this::messageHandler).endHook(monoSink).requestLimit(this.requestLimit).build());
        });
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public boolean isOpen() {
        return !getDelegate().isClosed();
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public Mono<Void> close(CloseStatus closeStatus) {
        this.logger.debug("{}Closing web socket with status '{}'", getLogPrefix(), closeStatus);
        return Mono.create(monoSink -> {
            getDelegate().closeHandler(r6 -> {
                this.logger.debug("{}Web socket closed", getLogPrefix());
                monoSink.success();
            }).close((short) closeStatus.getCode(), closeStatus.getReason());
        });
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public Mono<CloseStatus> closeStatus() {
        Short closeStatusCode = getDelegate().closeStatusCode();
        if (closeStatusCode == null) {
            return Mono.empty();
        }
        String closeReason = getDelegate().closeReason();
        return closeReason == null ? Mono.just(new CloseStatus(closeStatusCode.shortValue())) : Mono.just(new CloseStatus(closeStatusCode.shortValue(), closeReason));
    }

    private void messageHandler(WebSocketBase webSocketBase, WebSocketMessage webSocketMessage) {
        if (webSocketMessage.getType() == WebSocketMessage.Type.TEXT) {
            webSocketBase.writeTextMessage(webSocketMessage.getPayloadAsText());
            return;
        }
        Buffer buffer = this.bufferConverter.toBuffer(webSocketMessage.getPayload());
        if (webSocketMessage.getType() == WebSocketMessage.Type.PING) {
            webSocketBase.writePing(buffer);
        } else if (webSocketMessage.getType() == WebSocketMessage.Type.PONG) {
            webSocketBase.writePong(buffer);
        } else {
            webSocketBase.writeBinaryMessage(buffer);
        }
    }

    private WebSocketMessage binaryMessage(Buffer buffer) {
        return new WebSocketMessage(WebSocketMessage.Type.BINARY, this.bufferConverter.toDataBuffer(buffer));
    }

    private WebSocketMessage pongMessage(Buffer buffer) {
        return new WebSocketMessage(WebSocketMessage.Type.PONG, this.bufferConverter.toDataBuffer(buffer));
    }
}
