package io.dingodb.net.netty;

import io.dingodb.common.Location;
import io.dingodb.common.codec.PrimitiveCodec;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.concurrent.LinkedRunner;
import io.dingodb.common.util.DebugLog;
import io.dingodb.common.util.NoBreakFunctions;
import io.dingodb.expr.runtime.op.OpSymbol;
import io.dingodb.net.netty.api.ApiRegistryImpl;
import io.dingodb.net.netty.api.AuthProxyApi;
import io.dingodb.net.netty.api.HandshakeApi;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/net/netty/Connection.class */
public class Connection {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Connection.class);
    private final String channelType;
    private final long direction;
    private final Location remote;
    private final SocketChannel socket;
    private Map<String, Object[]> authContent;
    private ScheduledFuture<?> heartbeatFuture;
    private final Map<Long, Channel> channels = new ConcurrentHashMap();
    private final AtomicLong channelIdSeq = new AtomicLong(0);
    private final List<Consumer<Connection>> closeListeners = new ArrayList();
    private final Channel channel = createChannel(0);

    public Connection(String str, Location location, SocketChannel socketChannel) {
        this.channelType = str;
        this.socket = socketChannel;
        this.remote = location;
        this.channels.put(0L, this.channel);
        this.direction = Constant.CLIENT.equals(str) ? 0L : Long.MIN_VALUE;
    }

    protected Channel getChannel(long j) {
        Channel channel = this.channels.get(Long.valueOf(j));
        if (channel == null) {
            if (this.direction == Long.MIN_VALUE) {
                if ((Long.MIN_VALUE & j) != Long.MIN_VALUE) {
                    channel = createChannel(j);
                }
            } else if ((Long.MIN_VALUE & j) == Long.MIN_VALUE) {
                channel = createChannel(j);
            }
        }
        return channel;
    }

    private void removeChannel(long j) {
        this.channels.remove(Long.valueOf(j));
        DebugLog.debug(log, "Removed channel {} to remote \"{}\". # of channels: {}", Long.valueOf(j), this.remote.url(), Integer.valueOf(this.channels.size()));
    }

    protected Channel createChannel(long j) {
        DebugLog.debug(log, "Create channel {} to remote \"{}\". # of channels: {}", Long.valueOf(j), this.remote.url(), Integer.valueOf(this.channels.size()));
        return this.channels.computeIfAbsent(Long.valueOf(j), l -> {
            return new Channel(j, this, new LinkedRunner(fmtName(this.remote.url(), j)), (v1) -> {
                removeChannel(v1);
            });
        });
    }

    private String fmtName(String str, long j) {
        StringBuilder sb = new StringBuilder("<");
        sb.append(str).append(OpSymbol.DIV).append(j).append(OpSymbol.DIV).append(this.channelType).append(">");
        return sb.toString();
    }

    private void sendHeartbeat() {
        this.channel.sendAsync(this.channel.buffer((byte) 2, 1).writeByte(1));
    }

    public void handshake() {
        ((HandshakeApi) ApiRegistryImpl.instance().proxy(HandshakeApi.class, this.channel)).handshake(null, HandshakeApi.Handshake.INSTANCE);
        log.info("Connection handshake success, remote: [{}]", this.remote.url());
    }

    public void handshake(ByteBuffer byteBuffer) {
        if (byteBuffer.getLong() == 0 && byteBuffer.get() == 3 && Constant.HANDSHAKE.equals(PrimitiveCodec.readString(byteBuffer))) {
            ApiRegistryImpl.instance().invoke(Constant.HANDSHAKE, this.channel, byteBuffer);
        } else {
            log.error("Illegal connection [{}].", this.remote.url());
            close();
        }
    }

    public void auth() {
        this.authContent = AuthProxyApi.auth(this.channel);
        log.info("Connection auth success, remote: [{}]", this.remote.url());
        this.heartbeatFuture = Executors.scheduleWithFixedDelayAsync(String.format("%s-heartbeat", this.remote.url()), this::sendHeartbeat, 0L, 1L, TimeUnit.SECONDS);
    }

    public void auth(ByteBuffer byteBuffer) {
        if (byteBuffer.getLong() == 0 && byteBuffer.get() == 3 && Constant.AUTH.equals(PrimitiveCodec.readString(byteBuffer))) {
            this.authContent = (Map) ApiRegistryImpl.instance().invoke(Constant.AUTH, this.channel, byteBuffer);
        } else {
            log.error("Illegal connection [{}].", this.remote.url());
            close();
        }
    }

    public void receive(ByteBuffer byteBuffer) {
        if (byteBuffer == null) {
            return;
        }
        long j = byteBuffer.getLong();
        Channel channel = getChannel(j);
        if (channel == null) {
            log.error("Receive message, channel id is [{}], but not have channel.", Long.valueOf(j));
        } else {
            channel.receive(byteBuffer);
        }
    }

    public void send(ByteBuf byteBuf) throws InterruptedException {
        if (this.channel.isClosed()) {
            throw new RuntimeException("Connection closed.");
        }
        this.socket.writeAndFlush(byteBuf).await2();
    }

    public void sendAsync(ByteBuf byteBuf) {
        if (this.channel.isClosed()) {
            throw new RuntimeException("Connection closed.");
        }
        this.socket.writeAndFlush(byteBuf);
    }

    public Channel newChannel() {
        return createChannel(this.channelIdSeq.incrementAndGet() | this.direction);
    }

    public synchronized void addCloseListener(Consumer<Connection> consumer) {
        this.closeListeners.add(consumer);
    }

    public void close() {
        DebugLog.debug(log, "Close connection to [{}].", this.remote);
        if (this.heartbeatFuture != null) {
            this.heartbeatFuture.cancel(true);
        }
        this.channel.shutdown();
        this.channels.values().forEach((v0) -> {
            v0.shutdown();
        });
        this.channels.clear();
        if (this.authContent != null) {
            this.authContent.clear();
        }
        if (this.socket.isActive()) {
            this.socket.disconnect();
        }
        this.closeListeners.forEach(NoBreakFunctions.wrap(consumer -> {
            consumer.accept(this);
        }));
        DebugLog.debug(log, "Closed connection to [{}].", this.remote);
    }

    public Channel channel() {
        return this.channel;
    }

    public Location remote() {
        return this.remote;
    }

    public SocketChannel socket() {
        return this.socket;
    }

    public Map<String, Object[]> authContent() {
        return this.authContent;
    }

    public ServerSocketChannel parent() {
        return socket().parent();
    }

    public SocketChannelConfig config() {
        return socket().config();
    }

    public InetSocketAddress localAddress() {
        return socket().localAddress();
    }

    public InetSocketAddress remoteAddress() {
        return socket().remoteAddress();
    }

    public boolean isInputShutdown() {
        return socket().isInputShutdown();
    }

    public ChannelFuture shutdownInput() {
        return socket().shutdownInput();
    }

    public ChannelFuture shutdownInput(ChannelPromise channelPromise) {
        return socket().shutdownInput(channelPromise);
    }

    public boolean isOutputShutdown() {
        return socket().isOutputShutdown();
    }

    public ChannelFuture shutdownOutput() {
        return socket().shutdownOutput();
    }

    public ChannelFuture shutdownOutput(ChannelPromise channelPromise) {
        return socket().shutdownOutput(channelPromise);
    }

    public boolean isShutdown() {
        return socket().isShutdown();
    }

    public ChannelFuture shutdown() {
        return socket().shutdown();
    }

    public ChannelFuture shutdown(ChannelPromise channelPromise) {
        return socket().shutdown(channelPromise);
    }

    public ChannelId id() {
        return socket().id();
    }

    public EventLoop eventLoop() {
        return socket().eventLoop();
    }

    public boolean isOpen() {
        return socket().isOpen();
    }

    public boolean isRegistered() {
        return socket().isRegistered();
    }

    public boolean isActive() {
        return socket().isActive();
    }

    public ChannelMetadata metadata() {
        return socket().metadata();
    }

    public ChannelFuture closeFuture() {
        return socket().closeFuture();
    }

    public boolean isWritable() {
        return socket().isWritable();
    }

    public long bytesBeforeUnwritable() {
        return socket().bytesBeforeUnwritable();
    }

    public long bytesBeforeWritable() {
        return socket().bytesBeforeWritable();
    }

    public Channel.Unsafe unsafe() {
        return socket().unsafe();
    }

    public ChannelPipeline pipeline() {
        return socket().pipeline();
    }

    public ByteBufAllocator alloc() {
        return socket().alloc();
    }

    public int compareTo(io.netty.channel.Channel channel) {
        return socket().compareTo(channel);
    }
}
