package com.firefly.net.tcp.nio;

import com.firefly.net.Client;
import com.firefly.net.Config;
import com.firefly.net.Decoder;
import com.firefly.net.Encoder;
import com.firefly.net.Handler;
import com.firefly.net.Worker;
import com.firefly.net.event.DefaultEventManager;
import com.firefly.utils.log.Log;
import com.firefly.utils.log.LogFactory;
import com.firefly.utils.time.Millisecond100Clock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/firefly/net/tcp/nio/TcpClient.class */
public class TcpClient implements Client {
    private static Log log = LogFactory.getInstance().getLog("firefly-system");
    private Config config;
    private Worker[] workers;
    private Thread consumerThread;
    private Consumer consumer;
    private Selector selector;
    private final AtomicBoolean wakenUp;
    private AtomicInteger sessionId;
    private volatile boolean start;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/firefly/net/tcp/nio/TcpClient$ChannelInfo.class */
    public final class ChannelInfo {
        public SocketChannel channel;
        public int id;

        private ChannelInfo() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/firefly/net/tcp/nio/TcpClient$Consumer.class */
    public final class Consumer implements Runnable {
        private Queue<ChannelInfo> queue;

        private Consumer() {
            this.queue = new LinkedTransferQueue();
        }

        public void registerConnectedEvent(SocketChannel socketChannel, int i) {
            ChannelInfo channelInfo = new ChannelInfo();
            channelInfo.channel = socketChannel;
            channelInfo.id = i;
            this.queue.offer(channelInfo);
            if (TcpClient.this.wakenUp.compareAndSet(false, true)) {
                TcpClient.this.selector.wakeup();
            }
        }

        private void processRegisterTaskQueue() throws ClosedChannelException {
            while (true) {
                ChannelInfo poll = this.queue.poll();
                if (poll == null) {
                    return;
                }
                poll.channel.register(TcpClient.this.selector, 8, Integer.valueOf(poll.id));
                TcpClient.log.debug("register channel {}", new Object[]{Integer.valueOf(poll.id)});
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (TcpClient.this.start) {
                TcpClient.this.wakenUp.set(false);
                try {
                    TcpClient.this.selector.select(1000L);
                    if (TcpClient.this.wakenUp.get()) {
                        TcpClient.this.selector.wakeup();
                    }
                    processRegisterTaskQueue();
                    Iterator<SelectionKey> it = TcpClient.this.selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        it.remove();
                        if (next.isConnectable()) {
                            SocketChannel socketChannel = (SocketChannel) next.channel();
                            if (socketChannel.isConnectionPending() && socketChannel.finishConnect()) {
                                int intValue = ((Integer) next.attachment()).intValue();
                                TcpClient.log.debug("connection {} has finished in select loop", new Object[]{Integer.valueOf(intValue)});
                                TcpClient.this.accept(socketChannel, intValue);
                            }
                        }
                    }
                } catch (IOException e) {
                    TcpClient.log.error("Failed to create a connection.", e, new Object[0]);
                }
            }
        }
    }

    public TcpClient() {
        this.wakenUp = new AtomicBoolean();
        this.sessionId = new AtomicInteger(0);
        this.start = false;
    }

    public TcpClient(Decoder decoder, Encoder encoder, Handler handler) {
        this.wakenUp = new AtomicBoolean();
        this.sessionId = new AtomicInteger(0);
        this.start = false;
        this.config = new Config();
        this.config.setDecoder(decoder);
        this.config.setEncoder(encoder);
        this.config.setHandler(handler);
    }

    public TcpClient(Decoder decoder, Encoder encoder, Handler handler, int i) {
        this.wakenUp = new AtomicBoolean();
        this.sessionId = new AtomicInteger(0);
        this.start = false;
        this.config = new Config();
        this.config.setDecoder(decoder);
        this.config.setEncoder(encoder);
        this.config.setHandler(handler);
        this.config.setTimeout(i);
    }

    private synchronized Client init() throws IOException {
        if (this.start) {
            return this;
        }
        if (this.config == null) {
            throw new IllegalArgumentException("init error config is null");
        }
        DefaultEventManager defaultEventManager = new DefaultEventManager(this.config);
        log.info("client worker num: {}", new Object[]{Integer.valueOf(this.config.getWorkerThreads())});
        this.workers = new Worker[this.config.getWorkerThreads()];
        for (int i = 0; i < this.config.getWorkerThreads(); i++) {
            this.workers[i] = new TcpWorker(this.config, i, defaultEventManager);
        }
        this.selector = Selector.open();
        this.consumer = new Consumer();
        this.consumerThread = new Thread(this.consumer, this.config.getClientName());
        this.start = true;
        this.consumerThread.start();
        return this;
    }

    @Override // com.firefly.net.Client
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // com.firefly.net.Client
    public int connect(String str, int i) {
        int andIncrement = this.sessionId.getAndIncrement();
        connect(str, i, andIncrement);
        return andIncrement;
    }

    @Override // com.firefly.net.Client
    public void connect(String str, int i, int i2) {
        try {
            if (!this.start) {
                init();
            }
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            if (!open.connect(new InetSocketAddress(str, i)) || !open.isConnectionPending() || !open.finishConnect()) {
                this.consumer.registerConnectedEvent(open, i2);
            } else {
                log.debug("connection {} has finished immediately", new Object[]{Integer.valueOf(i2)});
                accept(open, i2);
            }
        } catch (IOException e) {
            log.error("connect error", e, new Object[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void accept(SocketChannel socketChannel, int i) {
        try {
            int abs = Math.abs(i) % this.workers.length;
            log.debug("accept sessionId [{}] and worker index [{}]", new Object[]{Integer.valueOf(i), Integer.valueOf(abs)});
            this.workers[abs].registerChannel(socketChannel, i);
        } catch (Exception e) {
            log.error("Failed to initialize an accepted socket.", e, new Object[0]);
            try {
                socketChannel.close();
            } catch (IOException e2) {
                log.error("Failed to close a partially accepted socket.", e2, new Object[0]);
            }
        }
    }

    @Override // com.firefly.net.Client
    public void shutdown() {
        for (Worker worker : this.workers) {
            worker.shutdown();
        }
        this.start = false;
        Millisecond100Clock.stop();
    }
}
