package com.firefly.net.tcp.codec.flex.stream.impl;

import com.firefly.net.tcp.TcpConnection;
import com.firefly.net.tcp.TcpPerformanceParameter;
import com.firefly.net.tcp.codec.flex.encode.FrameGenerator;
import com.firefly.net.tcp.codec.flex.protocol.ControlFrame;
import com.firefly.net.tcp.codec.flex.protocol.DataFrame;
import com.firefly.net.tcp.codec.flex.protocol.DisconnectionFrame;
import com.firefly.net.tcp.codec.flex.protocol.Frame;
import com.firefly.net.tcp.codec.flex.protocol.FrameType;
import com.firefly.net.tcp.codec.flex.protocol.MessageFrame;
import com.firefly.net.tcp.codec.flex.protocol.PingFrame;
import com.firefly.net.tcp.codec.flex.stream.Context;
import com.firefly.net.tcp.codec.flex.stream.FlexConnection;
import com.firefly.net.tcp.codec.flex.stream.Session;
import com.firefly.net.tcp.codec.flex.stream.Stream;
import com.firefly.net.tcp.codec.flex.stream.impl.StreamStateTransferMap;
import com.firefly.net.tcp.flex.metric.FlexMetric;
import com.firefly.utils.Assert;
import com.firefly.utils.concurrent.Callback;
import com.firefly.utils.concurrent.CountingCallback;
import com.firefly.utils.concurrent.Scheduler;
import com.firefly.utils.function.Action0;
import com.firefly.utils.io.IO;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/firefly/net/tcp/codec/flex/stream/impl/FlexSession.class */
public class FlexSession implements Session, Callback {
    protected static final Logger log = LoggerFactory.getLogger("firefly-system");
    protected final AtomicInteger idGenerator;
    protected final TcpConnection connection;
    protected final FlexMetric flexMetric;
    protected final long streamMaxIdleTime;
    protected final Scheduler scheduler;
    protected volatile Session.Listener listener;
    protected final ConcurrentMap<Integer, Stream> streamMap = new ConcurrentHashMap();
    protected final LazyContextAttribute attribute = new LazyContextAttribute();

    /* renamed from: com.firefly.net.tcp.codec.flex.stream.impl.FlexSession$4, reason: invalid class name */
    /* loaded from: input_file:com/firefly/net/tcp/codec/flex/stream/impl/FlexSession$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$firefly$net$tcp$codec$flex$protocol$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$com$firefly$net$tcp$codec$flex$protocol$FrameType[FrameType.CONTROL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$firefly$net$tcp$codec$flex$protocol$FrameType[FrameType.DATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$firefly$net$tcp$codec$flex$protocol$FrameType[FrameType.PING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$firefly$net$tcp$codec$flex$protocol$FrameType[FrameType.DISCONNECTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public FlexSession(int i, TcpConnection tcpConnection, FlexMetric flexMetric, long j, Scheduler scheduler) {
        this.idGenerator = new AtomicInteger(i);
        this.connection = tcpConnection;
        this.flexMetric = flexMetric;
        this.streamMaxIdleTime = j;
        this.scheduler = scheduler;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public Stream getStream(int i) {
        return this.streamMap.get(Integer.valueOf(i));
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public Map<Integer, Stream> getAllStreams() {
        return this.streamMap;
    }

    public void notifyCloseStream(FlexStream flexStream) {
        this.streamMap.remove(Integer.valueOf(flexStream.getId()));
        flexStream.onClose();
        this.flexMetric.getActiveStreamCount().dec();
        if (log.isDebugEnabled()) {
            log.debug("Closed stream {}", Integer.valueOf(flexStream.getId()));
        }
        FlexConnection.Listener listener = (FlexConnection.Listener) flexStream.getAttribute(FlexConnectionImpl.CTX_LISTENER_KEY);
        Context context = (Context) flexStream.getAttribute(FlexConnectionImpl.CONTEXT_KEY);
        if (listener == null || context == null) {
            return;
        }
        listener.close(context);
    }

    public void notifyNewStream(FlexStream flexStream, boolean z) {
        Assert.state(this.streamMap.putIfAbsent(Integer.valueOf(flexStream.getId()), flexStream) == null, "The stream " + flexStream.getId() + " has been created.");
        flexStream.setIdleTimeout(this.streamMaxIdleTime);
        if (log.isDebugEnabled()) {
            if (z) {
                log.debug("Create a new local stream: {}", flexStream.toString());
            } else {
                log.debug("Create a new remote stream: {}", flexStream.toString());
            }
        }
        this.flexMetric.getActiveStreamCount().inc();
        this.flexMetric.getRequestMeter().mark();
    }

    public void notifyFrame(Frame frame) {
        switch (AnonymousClass4.$SwitchMap$com$firefly$net$tcp$codec$flex$protocol$FrameType[frame.getType().ordinal()]) {
            case 1:
                ControlFrame controlFrame = (ControlFrame) frame;
                Stream stream = this.streamMap.get(Integer.valueOf(controlFrame.getStreamId()));
                if (stream == null) {
                    FlexStream flexStream = new FlexStream(controlFrame.getStreamId(), this, null, controlFrame.isEndStream() ? StreamStateTransferMap.getNextState(Stream.State.OPEN, StreamStateTransferMap.Op.RECV_ES) : Stream.State.OPEN, false, this.scheduler);
                    notifyNewStream(flexStream, false);
                    if (this.listener != null) {
                        flexStream.setListener(this.listener.onNewStream(flexStream, controlFrame));
                        return;
                    }
                    return;
                }
                FlexStream flexStream2 = (FlexStream) stream;
                flexStream2.notIdle();
                if (!controlFrame.isEndStream()) {
                    flexStream2.getListener().onControl(controlFrame);
                    return;
                }
                Stream.State nextState = StreamStateTransferMap.getNextState(stream.getState(), StreamStateTransferMap.Op.RECV_ES);
                flexStream2.setState(nextState);
                flexStream2.getListener().onControl(controlFrame);
                if (nextState == Stream.State.CLOSED) {
                    notifyCloseStream(flexStream2);
                    return;
                }
                return;
            case TcpPerformanceParameter.LATENCY /* 2 */:
                DataFrame dataFrame = (DataFrame) frame;
                FlexStream flexStream3 = (FlexStream) this.streamMap.get(Integer.valueOf(dataFrame.getStreamId()));
                Assert.state(flexStream3 != null, "The stream " + dataFrame.getStreamId() + " has been not created");
                flexStream3.notIdle();
                if (!dataFrame.isEndStream()) {
                    flexStream3.getListener().onData(dataFrame);
                    return;
                }
                Stream.State nextState2 = StreamStateTransferMap.getNextState(flexStream3.getState(), StreamStateTransferMap.Op.RECV_ES);
                flexStream3.setState(nextState2);
                flexStream3.getListener().onData(dataFrame);
                if (nextState2 == Stream.State.CLOSED) {
                    notifyCloseStream(flexStream3);
                    return;
                }
                return;
            case 3:
                PingFrame pingFrame = (PingFrame) frame;
                if (!pingFrame.isReply()) {
                    log.info("Connection {} received ping request.", Integer.valueOf(this.connection.getSessionId()));
                    sendFrame(new PingFrame(true));
                    return;
                } else {
                    log.info("Connection {} received ping reply.", Integer.valueOf(this.connection.getSessionId()));
                    if (this.listener != null) {
                        this.listener.onPing(this, pingFrame);
                        return;
                    }
                    return;
                }
            case 4:
                DisconnectionFrame disconnectionFrame = (DisconnectionFrame) frame;
                log.info("Received disconnection frame" + disconnectionFrame);
                if (this.listener != null) {
                    this.listener.onDisconnect(this, disconnectionFrame);
                }
                IO.close(this.connection);
                return;
            default:
                return;
        }
    }

    protected int generateId() {
        return this.idGenerator.getAndAdd(2);
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public Stream newStream(ControlFrame controlFrame, Callback callback, Stream.Listener listener) {
        Assert.notNull(listener, "The stream listener must be not null");
        int generateId = generateId();
        FlexStream flexStream = new FlexStream(generateId, this, listener, Stream.State.OPEN, true, this.scheduler);
        notifyNewStream(flexStream, true);
        sendFrame(new ControlFrame(controlFrame.isEndStream(), generateId, controlFrame.isEndFrame(), controlFrame.getData()), callback);
        return flexStream;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public void setListener(Session.Listener listener) {
        Assert.notNull(listener, "The session listener must be not null");
        this.listener = listener;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public CompletableFuture<Boolean> ping(PingFrame pingFrame) {
        return sendFrame(pingFrame);
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public CompletableFuture<Boolean> disconnect(DisconnectionFrame disconnectionFrame) {
        CompletableFuture<Boolean> sendFrame = sendFrame(disconnectionFrame);
        IO.close(this.connection);
        return sendFrame;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public CompletableFuture<Boolean> sendFrame(Frame frame) {
        final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        sendFrame(frame, new Callback() { // from class: com.firefly.net.tcp.codec.flex.stream.impl.FlexSession.1
            public void succeeded() {
                completableFuture.complete(true);
            }

            public void failed(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public CompletableFuture<Boolean> sendFrames(List<Frame> list) {
        final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        sendFrames(list, new Callback() { // from class: com.firefly.net.tcp.codec.flex.stream.impl.FlexSession.2
            public void succeeded() {
                completableFuture.complete(true);
            }

            public void failed(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public void sendFrame(final Frame frame, Callback callback) {
        _writeFrame(frame, new Callback.Nested(callback) { // from class: com.firefly.net.tcp.codec.flex.stream.impl.FlexSession.3
            public void succeeded() {
                FlexSession.this.getStream(frame, (v0) -> {
                    return v0.isEndStream();
                }).ifPresent(flexStream -> {
                    if (FlexSession.log.isDebugEnabled()) {
                        FlexSession.log.debug("The stream {} sends message frame success.", flexStream.toString());
                    }
                    Stream.State nextState = StreamStateTransferMap.getNextState(flexStream.getState(), StreamStateTransferMap.Op.SEND_ES);
                    flexStream.setState(nextState);
                    if (nextState == Stream.State.CLOSED) {
                        FlexSession.this.notifyCloseStream(flexStream);
                    }
                });
                super.succeeded();
                FlexSession.this.succeeded();
            }

            public void failed(Throwable th) {
                super.failed(th);
                FlexSession.this.failed(th);
            }
        });
    }

    protected Optional<FlexStream> getStream(Frame frame, Predicate<MessageFrame> predicate) {
        Optional map = Optional.ofNullable(frame).filter(frame2 -> {
            return frame2.getType() == FrameType.CONTROL || frame2.getType() == FrameType.DATA;
        }).map(frame3 -> {
            return (MessageFrame) frame3;
        }).filter(predicate).map((v0) -> {
            return v0.getStreamId();
        });
        ConcurrentMap<Integer, Stream> concurrentMap = this.streamMap;
        Objects.requireNonNull(concurrentMap);
        return map.map((v1) -> {
            return r1.get(v1);
        }).map(stream -> {
            return (FlexStream) stream;
        });
    }

    public void succeeded() {
    }

    public void failed(Throwable th) {
        log.error("Write frame error. {}", th.getMessage());
        IO.close(this.connection);
    }

    public void clear() {
        int size = this.streamMap.size();
        log.info("Connection closed. It will clear remaining {} streams.", Integer.valueOf(size));
        this.flexMetric.getActiveStreamCount().dec(size);
        this.streamMap.forEach((num, stream) -> {
            ((FlexStream) stream).onClose();
        });
        this.streamMap.clear();
    }

    protected boolean canWrite() {
        return (this.connection.isWaitingForClose() || this.connection.isShutdownOutput() || this.connection.isClosed()) ? false : true;
    }

    protected void _writeFrame(Frame frame, Callback callback) {
        if (!canWrite()) {
            log.warn("The connection is closed. It can not write frame {}", frame);
            callback.failed(new IOException("The connection is closed"));
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Send a frame: {}", frame);
        }
        getStream(frame, messageFrame -> {
            return true;
        }).ifPresent((v0) -> {
            v0.notIdle();
        });
        TcpConnection tcpConnection = this.connection;
        ByteBuffer generate = FrameGenerator.generate(frame);
        Objects.requireNonNull(callback);
        Action0 action0 = callback::succeeded;
        Objects.requireNonNull(callback);
        tcpConnection.write(generate, action0, callback::failed);
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.Session
    public void sendFrames(List<Frame> list, Callback callback) {
        CountingCallback countingCallback = new CountingCallback(callback, list.size());
        list.forEach(frame -> {
            sendFrame(frame, countingCallback);
        });
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.ContextAttribute
    public Map<String, Object> getAttributes() {
        return this.attribute.getAttributes();
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.ContextAttribute
    public void setAttribute(String str, Object obj) {
        this.attribute.setAttribute(str, obj);
    }

    @Override // com.firefly.net.tcp.codec.flex.stream.ContextAttribute
    public Object getAttribute(String str) {
        return this.attribute.getAttribute(str);
    }

    public TcpConnection getConnection() {
        return this.connection;
    }
}
