package io.craft.atom.nio;

import io.craft.atom.io.ChannelEventType;
import io.craft.atom.io.IoHandler;
import io.craft.atom.io.IoProcessor;
import io.craft.atom.io.IoProcessorX;
import io.craft.atom.io.IoProtocol;
import io.craft.atom.nio.spi.NioChannelEventDispatcher;
import io.craft.atom.util.thread.NamedThreadFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/craft/atom/nio/NioProcessor.class */
public class NioProcessor extends NioReactor implements IoProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(NioProcessor.class);
    private static final long FLUSH_SPIN_COUNT = 256;
    private static final long SELECT_TIMEOUT = 1000;
    private final NioChannelIdleTimer idleTimer;
    private final NioConfig config;
    private final Executor executor;
    private IoProtocol protocol;
    private volatile Selector selector;
    private final Queue<NioByteChannel> newChannels = new ConcurrentLinkedQueue();
    private final Queue<NioByteChannel> flushingChannels = new ConcurrentLinkedQueue();
    private final Queue<NioByteChannel> closingChannels = new ConcurrentLinkedQueue();
    private final Map<String, NioByteChannel> udpChannels = new ConcurrentHashMap();
    private final AtomicReference<ProcessThread> processThreadRef = new AtomicReference<>();
    private final NioByteBufferAllocator allocator = new NioByteBufferAllocator();
    private final AtomicBoolean wakeupCalled = new AtomicBoolean(false);
    private volatile boolean shutdown = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/craft/atom/nio/NioProcessor$ProcessThread.class */
    public class ProcessThread implements Runnable {
        private ProcessThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!NioProcessor.this.shutdown) {
                try {
                    int select = NioProcessor.this.select();
                    NioProcessor.this.flush();
                    NioProcessor.this.register();
                    if (select > 0) {
                        NioProcessor.this.process();
                    }
                    NioProcessor.this.close();
                } catch (Exception e) {
                    NioProcessor.LOG.error("[CRAFT-ATOM-NIO] Process exception", e);
                }
            }
            if (NioProcessor.this.shutdown) {
                try {
                    NioProcessor.this.shutdown0();
                } catch (Exception e2) {
                    NioProcessor.LOG.error("[CRAFT-ATOM-NIO] Shutdown exception", e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NioProcessor(NioConfig nioConfig, IoHandler ioHandler, NioChannelEventDispatcher nioChannelEventDispatcher, NioChannelIdleTimer nioChannelIdleTimer) {
        this.config = nioConfig;
        this.handler = ioHandler;
        this.dispatcher = nioChannelEventDispatcher;
        this.idleTimer = nioChannelIdleTimer;
        this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("craft-atom-nio-processor"));
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            throw new RuntimeException("Fail to startup a processor", e);
        }
    }

    public void add(NioByteChannel nioByteChannel) {
        if (this.shutdown) {
            throw new IllegalStateException("The processor already shutdown!");
        }
        if (nioByteChannel == null) {
            LOG.debug("[CRAFT-ATOM-NIO] Add channel is null, return");
            return;
        }
        this.newChannels.add(nioByteChannel);
        startup();
        wakeup();
    }

    private void startup() {
        if (this.processThreadRef.get() == null) {
            ProcessThread processThread = new ProcessThread();
            if (this.processThreadRef.compareAndSet(null, processThread)) {
                this.executor.execute(processThread);
            }
        }
    }

    private void wakeup() {
        this.wakeupCalled.getAndSet(true);
        this.selector.wakeup();
    }

    @Override // io.craft.atom.nio.NioReactor
    public void shutdown() {
        this.shutdown = true;
        wakeup();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdown0() throws IOException {
        this.closingChannels.addAll(this.newChannels);
        this.newChannels.clear();
        this.closingChannels.addAll(this.flushingChannels);
        this.flushingChannels.clear();
        close();
        this.selector.close();
        LOG.debug("[CRAFT-ATOM-NIO] Shutdown processor successful");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void close() throws IOException {
        NioByteChannel poll = this.closingChannels.poll();
        while (true) {
            NioByteChannel nioByteChannel = poll;
            if (nioByteChannel == null) {
                return;
            }
            this.idleTimer.remove(nioByteChannel);
            if (nioByteChannel.isClosed()) {
                LOG.debug("[CRAFT-ATOM-NIO] Skip close because it is already closed, |channel={}|", nioByteChannel);
            } else {
                nioByteChannel.setClosing();
                LOG.debug("[CRAFT-ATOM-NIO] Closing |channel={}|", nioByteChannel);
                close(nioByteChannel);
                nioByteChannel.setClosed();
                fireChannelClosed(nioByteChannel);
                LOG.debug("[CRAFT-ATOM-NIO] Closed |channel={}|" + nioByteChannel);
            }
            poll = this.closingChannels.poll();
        }
    }

    private void close(NioByteChannel nioByteChannel) throws IOException {
        try {
            nioByteChannel.close0();
            if (this.protocol == IoProtocol.UDP) {
                this.udpChannels.remove(udpChannelKey(nioByteChannel.getLocalAddress(), nioByteChannel.getRemoteAddress()));
            }
        } catch (Exception e) {
            LOG.warn("[CRAFT-ATOM-NIO] Catch close exception and fire it, |channel={}|", nioByteChannel, e);
            fireChannelThrown(nioByteChannel, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int select() throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        int select = this.selector.select(SELECT_TIMEOUT);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (select == 0 && !this.wakeupCalled.get() && currentTimeMillis2 < 100) {
            if (isBrokenConnection()) {
                LOG.debug("[CRAFT-ATOM-NIO] Broken connection wakeup");
            } else {
                LOG.debug("[CRAFT-ATOM-NIO] Create a new selector, |selected={}, delta={}|", Integer.valueOf(select), Long.valueOf(currentTimeMillis2));
                registerNewSelector();
            }
            this.wakeupCalled.getAndSet(false);
        }
        return select;
    }

    private void registerNewSelector() throws IOException {
        synchronized (this) {
            Set<SelectionKey> keys = this.selector.keys();
            Selector open = Selector.open();
            for (SelectionKey selectionKey : keys) {
                selectionKey.channel().register(open, selectionKey.interestOps(), (NioByteChannel) selectionKey.attachment());
            }
            this.selector.close();
            this.selector = open;
        }
    }

    private boolean isBrokenConnection() throws IOException {
        boolean z = false;
        synchronized (this.selector) {
            for (SelectionKey selectionKey : this.selector.keys()) {
                if (!((SocketChannel) selectionKey.channel()).isConnected()) {
                    selectionKey.cancel();
                    z = true;
                }
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void register() throws ClosedChannelException {
        NioByteChannel poll = this.newChannels.poll();
        while (true) {
            NioByteChannel nioByteChannel = poll;
            if (nioByteChannel == null) {
                return;
            }
            nioByteChannel.setSelectionKey(nioByteChannel.innerChannel().register(this.selector, 1, nioByteChannel));
            this.idleTimer.add(nioByteChannel);
            fireChannelOpened(nioByteChannel);
            poll = this.newChannels.poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process() {
        Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
        while (it.hasNext()) {
            NioByteChannel nioByteChannel = (NioByteChannel) it.next().attachment();
            if (nioByteChannel.isValid()) {
                process0(nioByteChannel);
            } else {
                LOG.debug("[CRAFT-ATOM-NIO] Channel is invalid, |channel={}|", nioByteChannel);
            }
            it.remove();
        }
    }

    private void process0(NioByteChannel nioByteChannel) {
        nioByteChannel.setLastIoTime(System.currentTimeMillis());
        if (nioByteChannel.isReadable()) {
            LOG.debug("[CRAFT-ATOM-NIO] Read event process on |channel={}|", nioByteChannel);
            read(nioByteChannel);
        }
        if (nioByteChannel.isWritable()) {
            LOG.debug("[CRAFT-ATOM-NIO] Write event process on |channel={}|", nioByteChannel);
            scheduleFlush(nioByteChannel);
        }
    }

    private void read(NioByteChannel nioByteChannel) {
        int next = nioByteChannel.getPredictor().next();
        ByteBuffer allocate = this.allocator.allocate(next);
        LOG.debug("[CRAFT-ATOM-NIO] Predict buffer |size={}, buffer={}|", Integer.valueOf(next), allocate);
        int i = 0;
        try {
            try {
                if (this.protocol.equals(IoProtocol.TCP)) {
                    i = readTcp(nioByteChannel, allocate);
                } else if (this.protocol.equals(IoProtocol.UDP)) {
                    i = readUdp(nioByteChannel, allocate);
                }
                if (i > 0) {
                    allocate.clear();
                }
            } catch (Exception e) {
                LOG.debug("[CRAFT-ATOM-NIO] Catch read exception and fire it, |channel={}|", nioByteChannel, e);
                fireChannelThrown(nioByteChannel, e);
                if (e instanceof IOException) {
                    scheduleClose(nioByteChannel);
                }
                if (0 > 0) {
                    allocate.clear();
                }
            }
        } catch (Throwable th) {
            if (0 > 0) {
                allocate.clear();
            }
            throw th;
        }
    }

    private int readTcp(NioByteChannel nioByteChannel, ByteBuffer byteBuffer) throws IOException {
        int readTcp;
        int i = 0;
        do {
            readTcp = nioByteChannel.readTcp(byteBuffer);
            if (readTcp <= 0) {
                break;
            }
            i += readTcp;
        } while (byteBuffer.hasRemaining());
        if (i > 0) {
            nioByteChannel.getPredictor().previous(i);
            fireChannelRead(nioByteChannel, byteBuffer, i);
            LOG.debug("[CRAFT-ATOM-NIO] Actual |readBytes={}|", Integer.valueOf(i));
        }
        if (readTcp < 0) {
            scheduleClose(nioByteChannel);
        }
        return i;
    }

    private void scheduleClose(NioByteChannel nioByteChannel) {
        if (nioByteChannel.isClosing() || nioByteChannel.isClosed()) {
            return;
        }
        this.closingChannels.add(nioByteChannel);
    }

    private int readUdp(NioByteChannel nioByteChannel, ByteBuffer byteBuffer) throws IOException {
        SocketAddress readUdp = nioByteChannel.readUdp(byteBuffer);
        if (readUdp == null) {
            return 0;
        }
        int position = byteBuffer.position();
        String udpChannelKey = udpChannelKey(nioByteChannel.getLocalAddress(), readUdp);
        if (!this.udpChannels.containsKey(udpChannelKey)) {
            nioByteChannel.setRemoteAddress(readUdp);
            this.udpChannels.put(udpChannelKey, nioByteChannel);
        }
        nioByteChannel.setLastIoTime(System.currentTimeMillis());
        fireChannelRead(nioByteChannel, byteBuffer, byteBuffer.position());
        return position;
    }

    private String udpChannelKey(SocketAddress socketAddress, SocketAddress socketAddress2) {
        return socketAddress.toString() + "-" + socketAddress2.toString();
    }

    public void flush(NioByteChannel nioByteChannel) {
        if (this.shutdown) {
            throw new IllegalStateException("The processor is already shutdown!");
        }
        if (nioByteChannel == null) {
            return;
        }
        scheduleFlush(nioByteChannel);
        wakeup();
    }

    private void scheduleFlush(NioByteChannel nioByteChannel) {
        if (nioByteChannel.setScheduleFlush(true)) {
            this.flushingChannels.add(nioByteChannel);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flush() {
        NioByteChannel poll;
        int i = 0;
        while (!this.flushingChannels.isEmpty() && i < FLUSH_SPIN_COUNT && (poll = this.flushingChannels.poll()) != null) {
            poll.unsetScheduleFlush();
            try {
                if (poll.isClosed() || poll.isClosing()) {
                    LOG.debug("[CRAFT-ATOM-NIO] Channel is closing or closed, |Channel={}, flushing-channel-size={}|", poll, Integer.valueOf(this.flushingChannels.size()));
                } else {
                    i++;
                    flush0(poll);
                }
            } catch (Exception e) {
                LOG.debug("[CRAFT-ATOM-NIO] Catch flush exception and fire it", e);
                fireChannelThrown(poll, e);
                if (e instanceof IOException) {
                    scheduleClose(poll);
                }
            }
        }
    }

    private void flush0(NioByteChannel nioByteChannel) throws IOException {
        LOG.debug("[CRAFT-ATOM-NIO] Flushing |channel={}|", nioByteChannel);
        Queue<ByteBuffer> writeBufferQueue = nioByteChannel.getWriteBufferQueue();
        setInterestedInWrite(nioByteChannel, false);
        if (this.config.isReadWritefair()) {
            fairFlush0(nioByteChannel, writeBufferQueue);
        } else {
            oneOffFlush0(nioByteChannel, writeBufferQueue);
        }
        if (writeBufferQueue.isEmpty()) {
            return;
        }
        setInterestedInWrite(nioByteChannel, true);
        scheduleFlush(nioByteChannel);
    }

    private void oneOffFlush0(NioByteChannel nioByteChannel, Queue<ByteBuffer> queue) throws IOException {
        ByteBuffer peek = queue.peek();
        if (peek == null) {
            return;
        }
        fireChannelFlush(nioByteChannel, peek);
        write(nioByteChannel, peek, peek.remaining());
        if (peek.hasRemaining()) {
            setInterestedInWrite(nioByteChannel, true);
            scheduleFlush(nioByteChannel);
        } else {
            queue.remove();
            fireChannelWritten(nioByteChannel, peek);
        }
    }

    private void fairFlush0(NioByteChannel nioByteChannel, Queue<ByteBuffer> queue) throws IOException {
        ByteBuffer byteBuffer = null;
        int i = 0;
        int maxWriteBufferSize = nioByteChannel.getMaxWriteBufferSize();
        LOG.debug("[CRAFT-ATOM-NIO] Max write byte size, |maxWriteBytes={}|", Integer.valueOf(maxWriteBufferSize));
        do {
            if (byteBuffer == null) {
                byteBuffer = queue.peek();
                if (byteBuffer == null) {
                    return;
                } else {
                    fireChannelFlush(nioByteChannel, byteBuffer);
                }
            }
            int i2 = maxWriteBufferSize - i;
            int write = write(nioByteChannel, byteBuffer, i2);
            LOG.debug("[CRAFT-ATOM-NIO] Flush |buffer={}, channel={}, bytes={}, size={}, qota={}, remaining={}|", new Object[]{new String(byteBuffer.array()), nioByteChannel, Integer.valueOf(write), Integer.valueOf(byteBuffer.array().length), Integer.valueOf(i2), Integer.valueOf(byteBuffer.remaining())});
            i += write;
            if (!byteBuffer.hasRemaining()) {
                LOG.debug("[CRAFT-ATOM-NIO] The buffer is all flushed, remove it from write queue");
                queue.remove();
                fireChannelWritten(nioByteChannel, byteBuffer);
                byteBuffer = null;
            } else if (write == 0) {
                LOG.debug("[CRAFT-ATOM-NIO] Zero byte be written, maybe kernel buffer is full so we re-interest in writing and later flush it, |channel={}|", nioByteChannel);
                setInterestedInWrite(nioByteChannel, true);
                scheduleFlush(nioByteChannel);
                return;
            } else if (write > 0 && byteBuffer.hasRemaining()) {
                LOG.debug("[CRAFT-ATOM-NIO] The buffer isn't empty, bytes to flush more than max bytes, we re-interest in writing and later flush it, |channel={}|", nioByteChannel);
                setInterestedInWrite(nioByteChannel, true);
                scheduleFlush(nioByteChannel);
                return;
            } else if (i >= maxWriteBufferSize && byteBuffer.hasRemaining()) {
                LOG.debug("[CRAFT-ATOM-NIO] Wrote too much, so we re-interest in writing and later flush other bytes, |channel={}|", nioByteChannel);
                setInterestedInWrite(nioByteChannel, true);
                scheduleFlush(nioByteChannel);
                return;
            }
        } while (i < maxWriteBufferSize);
    }

    private void setInterestedInWrite(NioByteChannel nioByteChannel, boolean z) {
        SelectionKey selectionKey = nioByteChannel.getSelectionKey();
        if (selectionKey == null || !selectionKey.isValid()) {
            return;
        }
        int interestOps = selectionKey.interestOps();
        int i = z ? interestOps | 4 : interestOps & (-5);
        if (interestOps != i) {
            selectionKey.interestOps(i);
        }
    }

    private int write(NioByteChannel nioByteChannel, ByteBuffer byteBuffer, int i) throws IOException {
        int i2 = 0;
        LOG.debug("[CRAFT-ATOM-NIO] Allow write max len={}, Waiting write byte buffer={}", Integer.valueOf(i), byteBuffer);
        if (byteBuffer.hasRemaining()) {
            int min = Math.min(byteBuffer.remaining(), i);
            if (this.protocol.equals(IoProtocol.TCP)) {
                i2 = writeTcp(nioByteChannel, byteBuffer, min);
            } else if (this.protocol.equals(IoProtocol.UDP)) {
                i2 = writeUdp(nioByteChannel, byteBuffer, min);
            }
        }
        LOG.debug("[CRAFT-ATOM-NIO] Actual written byte size, |writtenBytes={}|", Integer.valueOf(i2));
        return i2;
    }

    private int writeTcp(NioByteChannel nioByteChannel, ByteBuffer byteBuffer, int i) throws IOException {
        if (byteBuffer.remaining() <= i) {
            return nioByteChannel.writeTcp(byteBuffer);
        }
        int limit = byteBuffer.limit();
        byteBuffer.limit(byteBuffer.position() + i);
        try {
            int writeTcp = nioByteChannel.writeTcp(byteBuffer);
            byteBuffer.limit(limit);
            return writeTcp;
        } catch (Throwable th) {
            byteBuffer.limit(limit);
            throw th;
        }
    }

    private int writeUdp(NioByteChannel nioByteChannel, ByteBuffer byteBuffer, int i) throws IOException {
        if (byteBuffer.remaining() <= i) {
            return nioByteChannel.writeUdp(byteBuffer, nioByteChannel.getRemoteAddress());
        }
        int limit = byteBuffer.limit();
        byteBuffer.limit(byteBuffer.position() + i);
        try {
            int writeUdp = nioByteChannel.writeUdp(byteBuffer, nioByteChannel.getRemoteAddress());
            byteBuffer.limit(limit);
            return writeUdp;
        } catch (Throwable th) {
            byteBuffer.limit(limit);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(NioByteChannel nioByteChannel) {
        if (this.shutdown) {
            throw new IllegalStateException("The processor is already shutdown!");
        }
        if (nioByteChannel == null) {
            return;
        }
        scheduleClose(nioByteChannel);
        wakeup();
    }

    @Override // io.craft.atom.nio.NioReactor
    /* renamed from: x, reason: merged with bridge method [inline-methods] */
    public IoProcessorX mo1x() {
        NioProcessorX nioProcessorX = new NioProcessorX();
        nioProcessorX.setNewChannelCount(this.newChannels.size());
        nioProcessorX.setFlushingChannelCount(this.flushingChannels.size());
        nioProcessorX.setClosingChannelCount(this.closingChannels.size());
        return nioProcessorX;
    }

    public void setProtocol(IoProtocol ioProtocol) {
        this.protocol = ioProtocol;
    }

    private void fireChannelOpened(NioByteChannel nioByteChannel) {
        this.dispatcher.dispatch(new NioByteChannelEvent(ChannelEventType.CHANNEL_OPENED, nioByteChannel, this.handler));
    }

    private void fireChannelRead(NioByteChannel nioByteChannel, ByteBuffer byteBuffer, int i) {
        byte[] bArr = new byte[i];
        System.arraycopy(byteBuffer.array(), 0, bArr, 0, i);
        this.dispatcher.dispatch(new NioByteChannelEvent(ChannelEventType.CHANNEL_READ, nioByteChannel, this.handler, bArr));
    }

    private void fireChannelFlush(NioByteChannel nioByteChannel, ByteBuffer byteBuffer) {
        this.dispatcher.dispatch(new NioByteChannelEvent(ChannelEventType.CHANNEL_FLUSH, nioByteChannel, this.handler, byteBuffer.array()));
    }

    private void fireChannelWritten(NioByteChannel nioByteChannel, ByteBuffer byteBuffer) {
        this.dispatcher.dispatch(new NioByteChannelEvent(ChannelEventType.CHANNEL_WRITTEN, nioByteChannel, this.handler, byteBuffer.array()));
    }

    private void fireChannelThrown(NioByteChannel nioByteChannel, Exception exc) {
        this.dispatcher.dispatch(new NioByteChannelEvent(ChannelEventType.CHANNEL_THROWN, nioByteChannel, this.handler, exc));
    }

    private void fireChannelClosed(NioByteChannel nioByteChannel) {
        this.dispatcher.dispatch(new NioByteChannelEvent(ChannelEventType.CHANNEL_CLOSED, nioByteChannel, this.handler));
    }

    @Override // io.craft.atom.nio.NioReactor
    public String toString() {
        return "NioProcessor(super=" + super.toString() + ", newChannels=" + this.newChannels + ", flushingChannels=" + this.flushingChannels + ", closingChannels=" + this.closingChannels + ", udpChannels=" + this.udpChannels + ", config=" + this.config + ")";
    }
}
