package org.springframework.web.reactive.socket.adapter;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:WEB-INF/lib/spring-webflux-5.3.22.jar:org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.class */
public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T> implements Subscriber<Void> {
    private static final int RECEIVE_BUFFER_SIZE = 8192;

    @Nullable
    private final Sinks.Empty<Void> handlerCompletionSink;

    @Nullable
    private final MonoProcessor<Void> handlerCompletionMono;
    private final AbstractListenerWebSocketSession<T>.WebSocketReceivePublisher receivePublisher;

    @Nullable
    private volatile AbstractListenerWebSocketSession<T>.WebSocketSendProcessor sendProcessor;
    private final AtomicBoolean sendCalled;
    private final Sinks.One<CloseStatus> closeStatusSink;

    /* loaded from: input_file:WEB-INF/lib/spring-webflux-5.3.22.jar:org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession$WebSocketReceivePublisher.class */
    private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
        private volatile Queue<Object> pendingMessages;

        WebSocketReceivePublisher() {
            super(AbstractListenerWebSocketSession.this.getLogPrefix());
            this.pendingMessages = (Queue) Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get();
        }

        @Override // org.springframework.http.server.reactive.AbstractListenerReadPublisher
        protected void checkOnDataAvailable() {
            AbstractListenerWebSocketSession.this.resumeReceiving();
            int size = this.pendingMessages.size();
            if (rsReadLogger.isTraceEnabled()) {
                rsReadLogger.trace(getLogPrefix() + "checkOnDataAvailable (" + size + " pending)");
            }
            if (size > 0) {
                onDataAvailable();
            }
        }

        @Override // org.springframework.http.server.reactive.AbstractListenerReadPublisher
        protected void readingPaused() {
            AbstractListenerWebSocketSession.this.suspendReceiving();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.springframework.http.server.reactive.AbstractListenerReadPublisher
        @Nullable
        public WebSocketMessage read() throws IOException {
            return (WebSocketMessage) this.pendingMessages.poll();
        }

        void handleMessage(WebSocketMessage webSocketMessage) {
            if (AbstractListenerWebSocketSession.this.logger.isTraceEnabled()) {
                AbstractListenerWebSocketSession.this.logger.trace(getLogPrefix() + "Received " + webSocketMessage);
            } else if (rsReadLogger.isTraceEnabled()) {
                rsReadLogger.trace(getLogPrefix() + "Received " + webSocketMessage);
            }
            if (this.pendingMessages.offer(webSocketMessage)) {
                onDataAvailable();
            } else {
                discardData();
                throw new IllegalStateException("Too many messages. Please ensure WebSocketSession.receive() is subscribed to.");
            }
        }

        @Override // org.springframework.http.server.reactive.AbstractListenerReadPublisher
        protected void discardData() {
            while (true) {
                WebSocketMessage webSocketMessage = (WebSocketMessage) this.pendingMessages.poll();
                if (webSocketMessage == null) {
                    return;
                } else {
                    webSocketMessage.release();
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/spring-webflux-5.3.22.jar:org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession$WebSocketSendProcessor.class */
    protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {
        private volatile boolean isReady;

        WebSocketSendProcessor() {
            super(AbstractListenerWebSocketSession.this.receivePublisher.getLogPrefix());
            this.isReady = true;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor
        public boolean write(WebSocketMessage webSocketMessage) throws IOException {
            if (AbstractListenerWebSocketSession.this.logger.isTraceEnabled()) {
                AbstractListenerWebSocketSession.this.logger.trace(getLogPrefix() + "Sending " + webSocketMessage);
            } else if (rsWriteLogger.isTraceEnabled()) {
                rsWriteLogger.trace(getLogPrefix() + "Sending " + webSocketMessage);
            }
            return AbstractListenerWebSocketSession.this.sendMessage(webSocketMessage);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor
        public boolean isDataEmpty(WebSocketMessage webSocketMessage) {
            return webSocketMessage.getPayload().readableByteCount() == 0;
        }

        @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor
        protected boolean isWritePossible() {
            return this.isReady;
        }

        public void setReadyToSend(boolean z) {
            if (z && rsWriteLogger.isTraceEnabled()) {
                rsWriteLogger.trace(getLogPrefix() + "Ready to send");
            }
            this.isReady = z;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor
        public void discardData(WebSocketMessage webSocketMessage) {
            webSocketMessage.release();
        }
    }

    public AbstractListenerWebSocketSession(T t, String str, HandshakeInfo handshakeInfo, DataBufferFactory dataBufferFactory) {
        this(t, str, handshakeInfo, dataBufferFactory, (Sinks.Empty<Void>) null);
    }

    public AbstractListenerWebSocketSession(T t, String str, HandshakeInfo handshakeInfo, DataBufferFactory dataBufferFactory, @Nullable Sinks.Empty<Void> empty) {
        super(t, str, handshakeInfo, dataBufferFactory);
        this.sendCalled = new AtomicBoolean();
        this.closeStatusSink = Sinks.one();
        this.receivePublisher = new WebSocketReceivePublisher();
        this.handlerCompletionSink = empty;
        this.handlerCompletionMono = null;
    }

    @Deprecated
    public AbstractListenerWebSocketSession(T t, String str, HandshakeInfo handshakeInfo, DataBufferFactory dataBufferFactory, @Nullable MonoProcessor<Void> monoProcessor) {
        super(t, str, handshakeInfo, dataBufferFactory);
        this.sendCalled = new AtomicBoolean();
        this.closeStatusSink = Sinks.one();
        this.receivePublisher = new WebSocketReceivePublisher();
        this.handlerCompletionMono = monoProcessor;
        this.handlerCompletionSink = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractListenerWebSocketSession<T>.WebSocketSendProcessor getSendProcessor() {
        AbstractListenerWebSocketSession<T>.WebSocketSendProcessor webSocketSendProcessor = this.sendProcessor;
        Assert.state(webSocketSendProcessor != null, "No WebSocketSendProcessor available");
        return webSocketSendProcessor;
    }

    @Override // org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession, org.springframework.web.reactive.socket.WebSocketSession
    public Flux<WebSocketMessage> receive() {
        return canSuspendReceiving() ? Flux.from(this.receivePublisher) : Flux.from(this.receivePublisher).onBackpressureBuffer(8192);
    }

    @Override // org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession, org.springframework.web.reactive.socket.WebSocketSession
    public Mono<Void> send(Publisher<WebSocketMessage> publisher) {
        if (!this.sendCalled.compareAndSet(false, true)) {
            return Mono.error(new IllegalStateException("send() has already been called"));
        }
        AbstractListenerWebSocketSession<T>.WebSocketSendProcessor webSocketSendProcessor = new WebSocketSendProcessor();
        this.sendProcessor = webSocketSendProcessor;
        return Mono.from(subscriber -> {
            publisher.subscribe(webSocketSendProcessor);
            webSocketSendProcessor.subscribe(subscriber);
        });
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public Mono<CloseStatus> closeStatus() {
        return this.closeStatusSink.asMono();
    }

    protected abstract boolean canSuspendReceiving();

    protected abstract void suspendReceiving();

    protected abstract void resumeReceiving();

    protected abstract boolean sendMessage(WebSocketMessage webSocketMessage) throws IOException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleMessage(WebSocketMessage.Type type, WebSocketMessage webSocketMessage) {
        this.receivePublisher.handleMessage(webSocketMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleError(Throwable th) {
        this.closeStatusSink.tryEmitEmpty();
        this.receivePublisher.onError(th);
        AbstractListenerWebSocketSession<T>.WebSocketSendProcessor webSocketSendProcessor = this.sendProcessor;
        if (webSocketSendProcessor != null) {
            webSocketSendProcessor.cancel();
            webSocketSendProcessor.onError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleClose(CloseStatus closeStatus) {
        this.closeStatusSink.tryEmitValue(closeStatus);
        this.receivePublisher.onAllDataRead();
        AbstractListenerWebSocketSession<T>.WebSocketSendProcessor webSocketSendProcessor = this.sendProcessor;
        if (webSocketSendProcessor != null) {
            webSocketSendProcessor.cancel();
            webSocketSendProcessor.onComplete();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(Void r2) {
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (this.handlerCompletionSink != null) {
            this.handlerCompletionSink.tryEmitError(th);
        }
        if (this.handlerCompletionMono != null) {
            this.handlerCompletionMono.onError(th);
        }
        close(CloseStatus.SERVER_ERROR.withReason(th.getMessage()));
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.handlerCompletionSink != null) {
            this.handlerCompletionSink.tryEmitEmpty();
        }
        if (this.handlerCompletionMono != null) {
            this.handlerCompletionMono.onComplete();
        }
        close();
    }
}
