package com.firefly.codec.websocket.stream.impl;

import com.firefly.codec.common.AbstractConnection;
import com.firefly.codec.common.ConnectionEvent;
import com.firefly.codec.common.ConnectionType;
import com.firefly.codec.http2.model.HttpHeader;
import com.firefly.codec.http2.model.MetaData;
import com.firefly.codec.http2.stream.HTTP2Configuration;
import com.firefly.codec.websocket.decode.Parser;
import com.firefly.codec.websocket.encode.Generator;
import com.firefly.codec.websocket.frame.BinaryFrame;
import com.firefly.codec.websocket.frame.CloseFrame;
import com.firefly.codec.websocket.frame.Frame;
import com.firefly.codec.websocket.frame.PingFrame;
import com.firefly.codec.websocket.frame.PongFrame;
import com.firefly.codec.websocket.frame.TextFrame;
import com.firefly.codec.websocket.frame.WebSocketFrame;
import com.firefly.codec.websocket.model.CloseInfo;
import com.firefly.codec.websocket.model.Extension;
import com.firefly.codec.websocket.model.IncomingFrames;
import com.firefly.codec.websocket.model.WebSocketBehavior;
import com.firefly.codec.websocket.model.extension.AbstractExtension;
import com.firefly.codec.websocket.stream.ExtensionNegotiator;
import com.firefly.codec.websocket.stream.IOState;
import com.firefly.codec.websocket.stream.WebSocketConnection;
import com.firefly.codec.websocket.stream.WebSocketPolicy;
import com.firefly.net.ByteBufferOutputEntry;
import com.firefly.net.SecureSession;
import com.firefly.net.Session;
import com.firefly.utils.concurrent.Callback;
import com.firefly.utils.concurrent.Scheduler;
import com.firefly.utils.function.Action1;
import com.firefly.utils.function.Action2;
import com.firefly.utils.io.BufferUtils;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/firefly/codec/websocket/stream/impl/WebSocketConnectionImpl.class */
public class WebSocketConnectionImpl extends AbstractConnection implements WebSocketConnection, IncomingFrames {
    protected final ConnectionEvent<WebSocketConnection> connectionEvent;
    protected final Parser parser;
    protected final Generator generator;
    protected final WebSocketPolicy policy;
    protected final MetaData.Request upgradeRequest;
    protected final MetaData.Response upgradeResponse;
    protected IOState ioState;
    protected final HTTP2Configuration config;
    protected final ExtensionNegotiator extensionNegotiator;

    public WebSocketConnectionImpl(SecureSession secureSession, Session session, IncomingFrames incomingFrames, WebSocketPolicy webSocketPolicy, MetaData.Request request, MetaData.Response response, HTTP2Configuration hTTP2Configuration) {
        super(secureSession, session);
        this.extensionNegotiator = new ExtensionNegotiator();
        this.connectionEvent = new ConnectionEvent<>(this);
        this.parser = new Parser(webSocketPolicy);
        this.parser.setIncomingFramesHandler(this);
        this.generator = new Generator(webSocketPolicy);
        this.policy = webSocketPolicy;
        this.upgradeRequest = request;
        this.upgradeResponse = response;
        this.config = hTTP2Configuration;
        this.ioState = new IOState();
        this.ioState.onOpened();
        this.extensionNegotiator.setNextOutgoingFrames((frame, callback) -> {
            if (webSocketPolicy.getBehavior() == WebSocketBehavior.CLIENT && (frame instanceof WebSocketFrame)) {
                WebSocketFrame webSocketFrame = (WebSocketFrame) frame;
                if (!webSocketFrame.isMasked()) {
                    webSocketFrame.setMask(generateMask());
                }
            }
            ByteBuffer allocate = ByteBuffer.allocate(28 + frame.getPayloadLength());
            this.generator.generateWholeFrame(frame, allocate);
            BufferUtils.flipToFlush(allocate, 0);
            session.encode(new ByteBufferOutputEntry(callback, allocate));
            if (frame.getType() == Frame.Type.CLOSE && (frame instanceof CloseFrame)) {
                getIOState().onCloseLocal(new CloseInfo(((CloseFrame) frame).getPayload(), false));
                close();
            }
        });
        setNextIncomingFrames(incomingFrames);
        if (this.policy.getBehavior() == WebSocketBehavior.CLIENT) {
            Scheduler.Future scheduleAtFixedRate = scheduler.scheduleAtFixedRate(() -> {
                outgoingFrame(new PingFrame(), new Callback() { // from class: com.firefly.codec.websocket.stream.impl.WebSocketConnectionImpl.1
                    @Override // com.firefly.utils.concurrent.Callback
                    public void succeeded() {
                        WebSocketConnectionImpl.log.info("The websocket connection {} sent ping frame success", Integer.valueOf(WebSocketConnectionImpl.this.getSessionId()));
                    }

                    @Override // com.firefly.utils.concurrent.Callback
                    public void failed(Throwable th) {
                        WebSocketConnectionImpl.log.warn("the websocket connection {} sends ping frame failure. {}", Integer.valueOf(WebSocketConnectionImpl.this.getSessionId()), th.getMessage());
                    }
                });
            }, hTTP2Configuration.getWebsocketPingInterval(), hTTP2Configuration.getWebsocketPingInterval(), TimeUnit.MILLISECONDS);
            onClose(webSocketConnection -> {
                scheduleAtFixedRate.cancel();
            });
        }
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public WebSocketConnection onClose(Action1<WebSocketConnection> action1) {
        return this.connectionEvent.onClose(action1);
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public WebSocketConnection onException(Action2<WebSocketConnection, Throwable> action2) {
        return this.connectionEvent.onException(action2);
    }

    public void notifyClose() {
        this.connectionEvent.notifyClose();
    }

    public void notifyException(Throwable th) {
        this.connectionEvent.notifyException(th);
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public IOState getIOState() {
        return this.ioState;
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public WebSocketPolicy getPolicy() {
        return this.policy;
    }

    @Override // com.firefly.codec.websocket.model.OutgoingFrames
    public void outgoingFrame(Frame frame, Callback callback) {
        this.extensionNegotiator.getOutgoingFrames().outgoingFrame(frame, callback);
    }

    public void setNextIncomingFrames(IncomingFrames incomingFrames) {
        if (incomingFrames != null) {
            this.extensionNegotiator.setNextIncomingFrames(incomingFrames);
            List<Extension> parse = this.extensionNegotiator.parse(this.upgradeResponse.getFields().contains(HttpHeader.SEC_WEBSOCKET_EXTENSIONS) ? this.upgradeResponse : this.upgradeRequest);
            if (parse.isEmpty()) {
                return;
            }
            this.generator.configureFromExtensions(parse);
            this.parser.configureFromExtensions(parse);
            parse.stream().filter(extension -> {
                return extension instanceof AbstractExtension;
            }).map(extension2 -> {
                return (AbstractExtension) extension2;
            }).forEach(abstractExtension -> {
                abstractExtension.setPolicy(this.policy);
            });
        }
    }

    @Override // com.firefly.codec.websocket.model.IncomingFrames
    public void incomingError(Throwable th) {
        Optional.ofNullable(this.extensionNegotiator.getIncomingFrames()).ifPresent(incomingFrames -> {
            incomingFrames.incomingError(th);
        });
    }

    @Override // com.firefly.codec.websocket.model.IncomingFrames
    public void incomingFrame(Frame frame) {
        switch (frame.getType()) {
            case PING:
                outgoingFrame(new PongFrame(), Callback.NOOP);
                break;
            case CLOSE:
                this.ioState.onCloseRemote(new CloseInfo(((CloseFrame) frame).getPayload(), false));
                close();
                break;
            case PONG:
                log.info("The websocket connection {} received pong frame", Integer.valueOf(getSessionId()));
                break;
        }
        Optional.ofNullable(this.extensionNegotiator.getIncomingFrames()).ifPresent(incomingFrames -> {
            incomingFrames.incomingFrame(frame);
        });
    }

    @Override // com.firefly.codec.common.AbstractConnection, com.firefly.codec.common.ConnectionExtInfo
    public boolean isEncrypted() {
        return this.secureSession != null;
    }

    @Override // com.firefly.codec.common.ConnectionExtInfo
    public ConnectionType getConnectionType() {
        return ConnectionType.WEB_SOCKET;
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public byte[] generateMask() {
        byte[] bArr = new byte[4];
        ThreadLocalRandom.current().nextBytes(bArr);
        return bArr;
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public CompletableFuture<Boolean> sendText(String str) {
        TextFrame textFrame = new TextFrame();
        textFrame.setPayload(str);
        final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        outgoingFrame(textFrame, new Callback() { // from class: com.firefly.codec.websocket.stream.impl.WebSocketConnectionImpl.2
            @Override // com.firefly.utils.concurrent.Callback
            public void succeeded() {
                completableFuture.complete(true);
            }

            @Override // com.firefly.utils.concurrent.Callback
            public void failed(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public CompletableFuture<Boolean> sendData(byte[] bArr) {
        return _sendData(bArr, (v0, v1) -> {
            v0.setPayload(v1);
        });
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public CompletableFuture<Boolean> sendData(ByteBuffer byteBuffer) {
        return _sendData(byteBuffer, (v0, v1) -> {
            v0.setPayload(v1);
        });
    }

    private <T> CompletableFuture<Boolean> _sendData(T t, Action2<BinaryFrame, T> action2) {
        BinaryFrame binaryFrame = new BinaryFrame();
        action2.call(binaryFrame, t);
        final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        outgoingFrame(binaryFrame, new Callback() { // from class: com.firefly.codec.websocket.stream.impl.WebSocketConnectionImpl.3
            @Override // com.firefly.utils.concurrent.Callback
            public void succeeded() {
                completableFuture.complete(true);
            }

            @Override // com.firefly.utils.concurrent.Callback
            public void failed(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public MetaData.Request getUpgradeRequest() {
        return this.upgradeRequest;
    }

    @Override // com.firefly.codec.websocket.stream.WebSocketConnection
    public MetaData.Response getUpgradeResponse() {
        return this.upgradeResponse;
    }

    public ExtensionNegotiator getExtensionNegotiator() {
        return this.extensionNegotiator;
    }

    public Parser getParser() {
        return this.parser;
    }

    public Generator getGenerator() {
        return this.generator;
    }
}
