package com.firefly.net.tcp.codec.flex.stream.impl;

import com.firefly.net.tcp.TcpConnection;
import com.firefly.net.tcp.codec.flex.decode.MetaInfoParser;
import com.firefly.net.tcp.codec.flex.encode.MetaInfoGenerator;
import com.firefly.net.tcp.codec.flex.model.Request;
import com.firefly.net.tcp.codec.flex.model.Response;
import com.firefly.net.tcp.codec.flex.protocol.ControlFrame;
import com.firefly.net.tcp.codec.flex.protocol.DataFrame;
import com.firefly.net.tcp.codec.flex.protocol.DisconnectionFrame;
import com.firefly.net.tcp.codec.flex.protocol.PingFrame;
import com.firefly.net.tcp.codec.flex.stream.FlexConfiguration;
import com.firefly.net.tcp.codec.flex.stream.FlexConnection;
import com.firefly.net.tcp.codec.flex.stream.Session;
import com.firefly.net.tcp.codec.flex.stream.Stream;
import com.firefly.utils.Assert;
import com.firefly.utils.concurrent.Callback;
import com.firefly.utils.io.IO;
import com.firefly.utils.log.LogConfigParser;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/firefly/net/tcp/codec/flex/stream/impl/FlexConnectionImpl.class */
public class FlexConnectionImpl implements FlexConnection {
    protected static final Logger log = LoggerFactory.getLogger(LogConfigParser.DEFAULT_LOG_NAME);
    public static final String CONTEXT_KEY = "_context";
    public static final String CTX_LISTENER_KEY = "_contextListener";
    public static final String METAINFO_BYTES_KEY = "_receivedRequestMetaInfoBytesKey";
    protected final TcpConnection tcpConnection;
    protected final Session session;
    protected final FlexConfiguration configuration;

    /* loaded from: input_file:com/firefly/net/tcp/codec/flex/stream/impl/FlexConnectionImpl$NewRequestStreamListener.class */
    protected class NewRequestStreamListener implements Stream.Listener {
        protected final FlexConnection.Listener listener;
        protected ByteArrayOutputStream metaInfoByteArrayOutputStream = new ByteArrayOutputStream();

        public NewRequestStreamListener(FlexConnection.Listener listener) {
            this.listener = listener;
        }

        protected void saveData(byte[] bArr) {
            try {
                this.metaInfoByteArrayOutputStream.write(bArr);
            } catch (IOException e) {
            }
        }

        @Override // com.firefly.net.tcp.codec.flex.stream.Stream.Listener
        public void onControl(ControlFrame controlFrame) {
            if (!controlFrame.isEndFrame()) {
                saveData(controlFrame.getData());
                return;
            }
            saveData(controlFrame.getData());
            IO.close((OutputStream) this.metaInfoByteArrayOutputStream);
            Stream stream = FlexConnectionImpl.this.getSession().getStream(controlFrame.getStreamId());
            Assert.state(stream != null, "The stream has not been created");
            FlexContext context = FlexConnectionImpl.this.getContext(stream);
            Assert.state(context != null, "The flex context has not been created");
            context.setResponse((Response) ((MetaInfoParser) Optional.ofNullable(FlexConnectionImpl.this.configuration.getMetaInfoParser()).orElse(MetaInfoParser.DEFAULT)).parse(this.metaInfoByteArrayOutputStream.toByteArray(), Response.class));
            Assert.state(context.getResponse() != null, "Parse response meta info failure");
            try {
                this.listener.newResponse(context);
                if (controlFrame.isEndStream()) {
                    this.listener.messageComplete(context);
                }
            } catch (Exception e) {
                this.listener.exception(context, e);
            }
        }

        @Override // com.firefly.net.tcp.codec.flex.stream.Stream.Listener
        public void onData(DataFrame dataFrame) {
            Stream stream = FlexConnectionImpl.this.getSession().getStream(dataFrame.getStreamId());
            Assert.state(stream != null, "The stream has not been created. id: " + dataFrame.getStreamId());
            FlexConnectionImpl.this.onDataFrame(stream, dataFrame, this.listener);
        }
    }

    /* loaded from: input_file:com/firefly/net/tcp/codec/flex/stream/impl/FlexConnectionImpl$ReceivedRequestSessionListener.class */
    protected class ReceivedRequestSessionListener implements Session.Listener {
        protected final FlexConnection.Listener listener;

        public ReceivedRequestSessionListener(FlexConnection.Listener listener) {
            this.listener = listener;
        }

        protected FlexContext createContext(Stream stream) {
            Request request = (Request) ((MetaInfoParser) Optional.ofNullable(FlexConnectionImpl.this.configuration.getMetaInfoParser()).orElse(MetaInfoParser.DEFAULT)).parse(getMetaInfoByteArrayOutputStream(stream).toByteArray(), Request.class);
            stream.getAttributes().remove(FlexConnectionImpl.METAINFO_BYTES_KEY);
            Assert.state(request != null, "Parse request meta info failure");
            return new FlexContext(request, stream, FlexConnectionImpl.this);
        }

        protected ByteArrayOutputStream getMetaInfoByteArrayOutputStream(Stream stream) {
            return (ByteArrayOutputStream) stream.getAttribute(FlexConnectionImpl.METAINFO_BYTES_KEY);
        }

        protected void saveData(Stream stream, byte[] bArr) {
            try {
                getMetaInfoByteArrayOutputStream(stream).write(bArr);
            } catch (IOException e) {
            }
        }

        protected void init(Stream stream) {
            stream.setAttribute(FlexConnectionImpl.METAINFO_BYTES_KEY, new ByteArrayOutputStream());
        }

        protected void onControlFrame(Stream stream, ControlFrame controlFrame) {
            if (!controlFrame.isEndFrame()) {
                saveData(stream, controlFrame.getData());
                return;
            }
            saveData(stream, controlFrame.getData());
            FlexContext createContext = createContext(stream);
            stream.setAttribute(FlexConnectionImpl.CONTEXT_KEY, createContext);
            stream.setAttribute(FlexConnectionImpl.CTX_LISTENER_KEY, this.listener);
            try {
                this.listener.newRequest(createContext);
                if (controlFrame.isEndStream()) {
                    this.listener.messageComplete(createContext);
                }
            } catch (Exception e) {
                this.listener.exception(createContext, e);
            }
        }

        @Override // com.firefly.net.tcp.codec.flex.stream.Session.Listener
        public Stream.Listener onNewStream(final Stream stream, ControlFrame controlFrame) {
            init(stream);
            onControlFrame(stream, controlFrame);
            return new Stream.Listener() { // from class: com.firefly.net.tcp.codec.flex.stream.impl.FlexConnectionImpl.ReceivedRequestSessionListener.1
                @Override // com.firefly.net.tcp.codec.flex.stream.Stream.Listener
                public void onControl(ControlFrame controlFrame2) {
                    ReceivedRequestSessionListener.this.onControlFrame(stream, controlFrame2);
                }

                @Override // com.firefly.net.tcp.codec.flex.stream.Stream.Listener
                public void onData(DataFrame dataFrame) {
                    FlexConnectionImpl.this.onDataFrame(stream, dataFrame, ReceivedRequestSessionListener.this.listener);
                }
            };
        }

        @Override // com.firefly.net.tcp.codec.flex.stream.Session.Listener
        public void onPing(Session session, PingFrame pingFrame) {
            if (FlexConnectionImpl.log.isDebugEnabled()) {
                FlexConnectionImpl.log.debug("Connection {} received ping {}", Integer.valueOf(FlexConnectionImpl.this.getSessionId()), pingFrame.toString());
            }
        }

        @Override // com.firefly.net.tcp.codec.flex.stream.Session.Listener
        public void onDisconnect(Session session, DisconnectionFrame disconnectionFrame) {
            IO.close(FlexConnectionImpl.this);
        }
    }

    public FlexConnectionImpl(FlexConfiguration flexConfiguration, TcpConnection tcpConnection, Session session) {
        this.tcpConnection = tcpConnection;
        this.session = session;
        this.configuration = flexConfiguration;
    }

    @Override // com.firefly.net.Connection
    public Object getAttachment() {
        return this.tcpConnection.getAttachment();
    }

    @Override // com.firefly.net.Connection
    public void setAttachment(Object obj) {
        this.tcpConnection.setAttachment(obj);
    }

    @Override // com.firefly.net.Connection
    public int getSessionId() {
        return this.tcpConnection.getSessionId();
    }

    @Override // com.firefly.net.Connection
    public long getOpenTime() {
        return this.tcpConnection.getOpenTime();
    }

    @Override // com.firefly.net.Connection
    public long getCloseTime() {
        return this.tcpConnection.getCloseTime();
    }

    @Override // com.firefly.net.Connection
    public long getDuration() {
        return this.tcpConnection.getDuration();
    }

    @Override // com.firefly.net.Connection
    public long getLastReadTime() {
        return this.tcpConnection.getLastReadTime();
    }

    @Override // com.firefly.net.Connection
    public long getLastWrittenTime() {
        return this.tcpConnection.getLastWrittenTime();
    }

    @Override // com.firefly.net.Connection
    public long getLastActiveTime() {
        return this.tcpConnection.getLastActiveTime();
    }

    @Override // com.firefly.net.Connection
    public long getReadBytes() {
        return this.tcpConnection.getReadBytes();
    }

    @Override // com.firefly.net.Connection
    public long getWrittenBytes() {
        return this.tcpConnection.getWrittenBytes();
    }

    @Override // com.firefly.net.Connection
    public long getIdleTimeout() {
        return this.tcpConnection.getIdleTimeout();
    }

    @Override // com.firefly.net.Connection
    public long getMaxIdleTimeout() {
        return this.tcpConnection.getMaxIdleTimeout();
    }

    @Override // com.firefly.net.Connection
    public boolean isOpen() {
        return this.tcpConnection.isOpen();
    }

    @Override // com.firefly.net.Connection
    public boolean isClosed() {
        return this.tcpConnection.isClosed();
    }

    @Override // com.firefly.net.Connection
    public InetSocketAddress getLocalAddress() {
        return this.tcpConnection.getLocalAddress();
    }

    @Override // com.firefly.net.Connection
    public InetSocketAddress getRemoteAddress() {
        return this.tcpConnection.getRemoteAddress();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.tcpConnection.close();
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.FlexConnection
    public Session getSession() {
        return this.session;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.FlexConnection
    public void newRequest(Request request, FlexConnection.Listener listener) {
        Assert.notNull(request, "The request must be not null");
        Assert.notNull(listener, "The context listener must be not null");
        Stream newStream = getSession().newStream(new ControlFrame(false, 0, true, generate(request)), Callback.NOOP, new NewRequestStreamListener(listener));
        FlexContext flexContext = new FlexContext(request, newStream, this);
        newStream.setAttribute(CONTEXT_KEY, flexContext);
        newStream.setAttribute(CTX_LISTENER_KEY, listener);
        listener.newRequest(flexContext);
    }

    private byte[] generate(Request request) {
        return ((MetaInfoGenerator) Optional.ofNullable(this.configuration.getMetaInfoGenerator()).orElse(MetaInfoGenerator.DEFAULT)).generate(request);
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.FlexConnection
    public void onRequest(FlexConnection.Listener listener) {
        Assert.notNull(listener, "The context listener must be not null");
        this.session.setListener(new ReceivedRequestSessionListener(listener));
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.FlexConnection
    public FlexConfiguration getConfiguration() {
        return this.configuration;
    }

    protected FlexContext getContext(Stream stream) {
        return (FlexContext) stream.getAttribute(CONTEXT_KEY);
    }

    protected void onDataFrame(Stream stream, DataFrame dataFrame, FlexConnection.Listener listener) {
        FlexContext context = getContext(stream);
        Assert.state(context != null, "The flex context has not been created");
        try {
            if (dataFrame.isEndFrame()) {
                Optional.ofNullable(dataFrame.getData()).ifPresent(bArr -> {
                    listener.content(context, bArr);
                });
                listener.contentComplete(context);
                if (dataFrame.isEndStream()) {
                    listener.messageComplete(context);
                }
            } else {
                Optional.ofNullable(dataFrame.getData()).ifPresent(bArr2 -> {
                    listener.content(context, bArr2);
                });
            }
        } catch (Exception e) {
            listener.exception(context, e);
        }
    }
}
