package com.firefly.net.tcp;

import com.firefly.net.Config;
import com.firefly.net.EventManager;
import com.firefly.net.ReceiveBufferPool;
import com.firefly.net.ReceiveBufferSizePredictor;
import com.firefly.net.SendBufferPool;
import com.firefly.net.Session;
import com.firefly.net.Worker;
import com.firefly.net.buffer.SocketReceiveBufferPool;
import com.firefly.net.buffer.SocketSendBufferPool;
import com.firefly.net.exception.NetException;
import com.firefly.utils.collection.LinkedTransferQueue;
import com.firefly.utils.log.Log;
import com.firefly.utils.log.LogFactory;
import com.firefly.utils.time.Millisecond100Clock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/firefly/net/tcp/TcpWorker.class */
public final class TcpWorker implements Worker {
    private static Log log;
    private final Config config;
    private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue();
    private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue();
    private final Queue<SelectionKey> closeTaskQueue = new LinkedTransferQueue();
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    private final ReceiveBufferPool receiveBufferPool = new SocketReceiveBufferPool();
    private final SendBufferPool sendBufferPool = new SocketSendBufferPool();
    private final Selector selector;
    private final int workerId;
    final EventManager eventManager;
    private volatile int cancelledKeys;
    private Thread thread;
    private boolean start;
    private long lastIoTimeoutCheckTime;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/firefly/net/tcp/TcpWorker$RegisterTask.class */
    private final class RegisterTask implements Runnable {
        private SocketChannel socketChannel;
        private int sessionId;

        public RegisterTask(SocketChannel socketChannel, int i) {
            this.socketChannel = socketChannel;
            this.sessionId = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            SelectionKey selectionKey = null;
            try {
                this.socketChannel.configureBlocking(false);
                this.socketChannel.socket().setReuseAddress(true);
                this.socketChannel.socket().setTcpNoDelay(false);
                this.socketChannel.socket().setKeepAlive(true);
                selectionKey = this.socketChannel.register(TcpWorker.this.selector, 1);
                TcpSession tcpSession = new TcpSession(this.sessionId, TcpWorker.this, TcpWorker.this.config, Millisecond100Clock.currentTimeMillis(), selectionKey);
                selectionKey.attach(tcpSession);
                InetSocketAddress localAddress = tcpSession.getLocalAddress();
                InetSocketAddress remoteAddress = tcpSession.getRemoteAddress();
                if (localAddress == null || remoteAddress == null) {
                    TcpWorker.this.close0(selectionKey);
                }
                TcpWorker.this.eventManager.executeOpenTask(tcpSession);
            } catch (IOException e) {
                TcpWorker.log.error("socketChannel register error", e, new Object[0]);
                TcpWorker.this.close0(selectionKey);
            }
        }
    }

    public TcpWorker(Config config, int i, EventManager eventManager) {
        try {
            this.workerId = i;
            this.config = config;
            this.eventManager = eventManager;
            this.selector = Selector.open();
            this.start = true;
            new Thread(this, "firefly-tcp-worker: " + i).start();
        } catch (IOException e) {
            log.error("worker init error", e, new Object[0]);
            throw new NetException("worker init error");
        }
    }

    @Override // com.firefly.net.Worker
    public int getWorkerId() {
        return this.workerId;
    }

    @Override // com.firefly.net.Worker
    public void registerSelectableChannel(SelectableChannel selectableChannel, int i) {
        this.registerTaskQueue.offer(new RegisterTask((SocketChannel) selectableChannel, i));
        if (this.wakenUp.compareAndSet(false, true)) {
            this.selector.wakeup();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.thread = Thread.currentThread();
        this.lastIoTimeoutCheckTime = Millisecond100Clock.currentTimeMillis();
        while (this.start) {
            this.wakenUp.set(false);
            try {
                select(this.selector);
                if (this.wakenUp.get()) {
                    this.selector.wakeup();
                }
                this.cancelledKeys = 0;
                processRegisterTaskQueue();
                processWriteTaskQueue();
                processSelectedKeys();
                processCloseTaskQueue();
                processTimeout();
            } catch (Throwable th) {
                log.error("Unexpected exception in the selector loop.", th, new Object[0]);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private void processTimeout() {
        long currentTimeMillis = Millisecond100Clock.currentTimeMillis();
        if (currentTimeMillis - this.lastIoTimeoutCheckTime < 5000) {
            return;
        }
        Iterator<SelectionKey> it = this.selector.keys().iterator();
        while (it.hasNext()) {
            checkTimeout(it.next());
        }
        this.lastIoTimeoutCheckTime = currentTimeMillis;
    }

    private void checkTimeout(SelectionKey selectionKey) {
        TcpSession tcpSession = (TcpSession) selectionKey.attachment();
        if (tcpSession.isOpen()) {
            long currentTimeMillis = Millisecond100Clock.currentTimeMillis() - Math.max(tcpSession.getOpenTime(), tcpSession.getLastActiveTime());
            if (this.config.getTimeout() <= 0 || currentTimeMillis <= this.config.getTimeout()) {
                return;
            }
            log.debug("process timeout in select loop|{}|{}", new Object[]{Integer.valueOf(tcpSession.getSessionId()), Long.valueOf(currentTimeMillis)});
            close0(selectionKey);
        }
    }

    private void processCloseTaskQueue() throws IOException {
        while (true) {
            SelectionKey poll = this.closeTaskQueue.poll();
            if (poll == null) {
                return;
            }
            TcpSession tcpSession = (TcpSession) poll.attachment();
            log.debug("process close in queue|{}|{}", new Object[]{Integer.valueOf(tcpSession.getSessionId()), Boolean.valueOf(tcpSession.isOpen())});
            close0(poll);
            cleanUpCancelledKeys();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFromUserCode(TcpSession tcpSession) {
        if (tcpSession.closeTaskInTaskQueue.compareAndSet(false, true)) {
            this.closeTaskQueue.offer(tcpSession.selectionKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void close0(SelectionKey selectionKey) {
        TcpSession tcpSession = (TcpSession) selectionKey.attachment();
        if (tcpSession.isOpen()) {
            try {
                selectionKey.channel().close();
                increaseCancelledKey();
                cleanUpWriteBuffer(tcpSession);
                this.eventManager.executeCloseTask(tcpSession);
                tcpSession.state = 0;
            } catch (IOException e) {
                log.error("channel close error", e, new Object[0]);
            }
        }
    }

    private void processWriteTaskQueue() throws IOException {
        while (true) {
            Runnable poll = this.writeTaskQueue.poll();
            if (poll == null) {
                return;
            }
            poll.run();
            cleanUpCancelledKeys();
        }
    }

    private void processSelectedKeys() throws IOException {
        int readyOps;
        Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
        while (it.hasNext()) {
            SelectionKey next = it.next();
            it.remove();
            try {
                readyOps = next.readyOps();
            } catch (CancelledKeyException e) {
                log.debug("processSelectedKeys error close session", e, new Object[0]);
                close0(next);
            }
            if (((readyOps & 1) == 0 && readyOps != 0) || read(next)) {
                if ((readyOps & 4) != 0) {
                    writeFromSelectorLoop(next);
                }
                if (cleanUpCancelledKeys()) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeFromUserCode(TcpSession tcpSession) {
        if (!tcpSession.isOpen() || scheduleWriteIfNecessary(tcpSession) || tcpSession.writeSuspended || tcpSession.inWriteNowLoop) {
            return;
        }
        log.debug("worker thread write");
        write0(tcpSession);
    }

    private boolean scheduleWriteIfNecessary(TcpSession tcpSession) {
        if (Thread.currentThread() == this.thread) {
            return false;
        }
        log.debug("schedule write >>>>");
        if (tcpSession.writeTaskInTaskQueue.compareAndSet(false, true)) {
            boolean offer = this.writeTaskQueue.offer(tcpSession.writeTask);
            if (!$assertionsDisabled && !offer) {
                throw new AssertionError();
            }
        }
        if (!this.wakenUp.compareAndSet(false, true)) {
            return true;
        }
        this.selector.wakeup();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeFromTaskLoop(TcpSession tcpSession) {
        if (tcpSession.writeSuspended) {
            return;
        }
        write0(tcpSession);
    }

    private void writeFromSelectorLoop(SelectionKey selectionKey) {
        TcpSession tcpSession = (TcpSession) selectionKey.attachment();
        tcpSession.writeSuspended = false;
        write0(tcpSession);
    }

    private void write0(TcpSession tcpSession) {
        if (tcpSession.isOpen()) {
            boolean z = true;
            boolean z2 = false;
            boolean z3 = false;
            long j = 0;
            SocketChannel socketChannel = (SocketChannel) tcpSession.selectionKey.channel();
            Queue<Object> queue = tcpSession.writeBuffer;
            tcpSession.inWriteNowLoop = true;
            while (true) {
                Object obj = tcpSession.currentWrite;
                SocketSendBufferPool.SendBuffer sendBuffer = null;
                if (obj == null) {
                    Object poll = queue.poll();
                    tcpSession.currentWrite = poll;
                    if (tcpSession.currentWrite == null) {
                        z3 = true;
                        tcpSession.writeSuspended = false;
                        break;
                    } else if (poll == Session.CLOSE_FLAG) {
                        z = false;
                    } else {
                        sendBuffer = this.sendBufferPool.acquire(poll);
                        tcpSession.currentWriteBuffer = sendBuffer;
                    }
                } else if (obj == Session.CLOSE_FLAG) {
                    z = false;
                } else {
                    sendBuffer = tcpSession.currentWriteBuffer;
                }
                try {
                    log.debug("0> session is open: {}", new Object[]{Boolean.valueOf(z)});
                } catch (AsynchronousCloseException e) {
                } catch (Throwable th) {
                    if (sendBuffer != null) {
                        sendBuffer.release();
                    }
                    tcpSession.resetCurrentWriteAndWriteBuffer();
                    this.eventManager.executeExceptionTask(tcpSession, th);
                    if (th instanceof IOException) {
                        log.debug("write0 IOException session close");
                        z = false;
                        close0(tcpSession.selectionKey);
                    }
                }
                if (z) {
                    int i = 16;
                    while (true) {
                        if (i <= 0) {
                            break;
                        }
                        long transferTo = sendBuffer.transferTo(socketChannel);
                        if (transferTo != 0) {
                            j += transferTo;
                            break;
                        } else if (sendBuffer.finished()) {
                            break;
                        } else {
                            i--;
                        }
                    }
                    if (!sendBuffer.finished()) {
                        z2 = true;
                        tcpSession.writeSuspended = true;
                        break;
                    } else {
                        sendBuffer.release();
                        tcpSession.resetCurrentWriteAndWriteBuffer();
                    }
                } else {
                    log.debug("receive close flag");
                    if (!$assertionsDisabled && sendBuffer != null) {
                        throw new AssertionError();
                    }
                    tcpSession.resetCurrentWriteAndWriteBuffer();
                    clearOpWrite(tcpSession);
                    close0(tcpSession.selectionKey);
                }
            }
            tcpSession.inWriteNowLoop = false;
            if (z) {
                if (z2) {
                    setOpWrite(tcpSession);
                } else if (z3) {
                    clearOpWrite(tcpSession);
                }
            }
            if (j > 0) {
                tcpSession.lastWrittenTime = Millisecond100Clock.currentTimeMillis();
                tcpSession.writtenBytes += j;
                log.debug("write complete size: {}", new Object[]{Long.valueOf(j)});
                log.debug("1> session is open: {}", new Object[]{Boolean.valueOf(z)});
                log.debug("is in write loop: {}", new Object[]{Boolean.valueOf(tcpSession.inWriteNowLoop)});
            }
        }
    }

    private void setOpWrite(TcpSession tcpSession) {
        SelectionKey selectionKey = tcpSession.selectionKey;
        if (selectionKey == null) {
            return;
        }
        if (!selectionKey.isValid()) {
            log.debug("setOpWrite failure session close");
            close0(selectionKey);
            return;
        }
        int i = tcpSession.interestOps;
        if ((i & 4) == 0) {
            int i2 = i | 4;
            selectionKey.interestOps(i2);
            tcpSession.interestOps = i2;
        }
    }

    private void clearOpWrite(TcpSession tcpSession) {
        SelectionKey selectionKey = tcpSession.selectionKey;
        if (selectionKey == null) {
            return;
        }
        if (!selectionKey.isValid()) {
            log.debug("clearOpWrite key valid false");
            close0(selectionKey);
            return;
        }
        int i = tcpSession.interestOps;
        if ((i & 4) != 0) {
            int i2 = i & (-5);
            log.debug("clear write op >>> {}", new Object[]{Integer.valueOf(i2)});
            selectionKey.interestOps(i2);
            tcpSession.interestOps = i2;
        }
    }

    private void cleanUpWriteBuffer(TcpSession tcpSession) {
        NetException netException = null;
        boolean z = false;
        if (tcpSession.currentWrite != null) {
            netException = new NetException("cleanUpWriteBuffer error");
            tcpSession.currentWriteBuffer.release();
            tcpSession.resetCurrentWriteAndWriteBuffer();
            z = true;
        }
        Queue<Object> queue = tcpSession.writeBuffer;
        if (!queue.isEmpty()) {
            if (netException == null) {
                netException = new NetException("cleanUpWriteBuffer error");
            }
            while (true) {
                Object poll = queue.poll();
                if (poll == null) {
                    break;
                }
                log.warn("error clear obj: {}", new Object[]{poll.getClass().toString()});
                z = true;
            }
        }
        if (z) {
            this.eventManager.executeExceptionTask(tcpSession, netException);
        }
    }

    private boolean read(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        TcpSession tcpSession = (TcpSession) selectionKey.attachment();
        ReceiveBufferSizePredictor receiveBufferSizePredictor = tcpSession.receiveBufferSizePredictor;
        int i = 0;
        int i2 = 0;
        boolean z = true;
        ByteBuffer acquire = this.receiveBufferPool.acquire(receiveBufferSizePredictor.nextReceiveBufferSize());
        do {
            try {
                int read = socketChannel.read(acquire);
                i = read;
                if (read <= 0) {
                    break;
                }
                i2 += i;
            } catch (ClosedChannelException e) {
            } catch (Throwable th) {
                this.eventManager.executeExceptionTask(tcpSession, th);
            }
        } while (acquire.hasRemaining());
        z = false;
        if (i2 > 0) {
            acquire.flip();
            this.receiveBufferPool.release(acquire);
            receiveBufferSizePredictor.previousReceiveBufferSize(i2);
            tcpSession.readBytes += i2;
            tcpSession.lastReadTime = Millisecond100Clock.currentTimeMillis();
            try {
                this.config.getDecoder().decode(acquire, tcpSession);
            } catch (Throwable th2) {
                this.eventManager.executeExceptionTask(tcpSession, th2);
            }
        } else {
            this.receiveBufferPool.release(acquire);
        }
        if (i >= 0 && !z) {
            return true;
        }
        log.debug("read failure session close");
        selectionKey.cancel();
        close0(selectionKey);
        return false;
    }

    private void processRegisterTaskQueue() throws IOException {
        while (true) {
            Runnable poll = this.registerTaskQueue.poll();
            if (poll == null) {
                return;
            }
            poll.run();
            cleanUpCancelledKeys();
        }
    }

    static void select(Selector selector) throws IOException {
        try {
            selector.select(500L);
        } catch (CancelledKeyException e) {
            log.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e, new Object[0]);
        }
    }

    private boolean cleanUpCancelledKeys() throws IOException {
        if (this.cancelledKeys < 256) {
            return false;
        }
        this.cancelledKeys = 0;
        this.selector.selectNow();
        return true;
    }

    private void increaseCancelledKey() {
        this.cancelledKeys++;
    }

    @Override // com.firefly.net.Worker
    public void shutdown() {
        this.eventManager.shutdown();
        this.start = false;
        log.debug("thread {} is shutdown: {}", new Object[]{this.thread.getName(), Boolean.valueOf(this.thread.isInterrupted())});
    }

    static {
        $assertionsDisabled = !TcpWorker.class.desiredAssertionStatus();
        log = LogFactory.getInstance().getLog("firefly-system");
    }
}
