package io.gridgo.socket.impl;

import io.gridgo.connector.impl.AbstractHasResponderConsumer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.support.Message;
import io.gridgo.socket.Socket;
import io.gridgo.socket.SocketConnector;
import io.gridgo.socket.SocketConstants;
import io.gridgo.socket.SocketConsumer;
import io.gridgo.socket.SocketFactory;
import io.gridgo.socket.SocketOptions;
import io.gridgo.utils.ThreadUtils;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.text.DecimalFormat;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gridgo/socket/impl/DefaultSocketConsumer.class */
public class DefaultSocketConsumer extends AbstractHasResponderConsumer implements SocketConsumer {
    private static final Logger log = LoggerFactory.getLogger(DefaultSocketConsumer.class);
    private long totalRecvBytes;
    private long totalRecvMessages;
    private Thread poller;
    private final int bufferSize;
    private final SocketFactory factory;
    private final SocketOptions options;
    private final String address;
    private CountDownLatch doneSignal;
    private boolean autoSkipTopicHeader;
    private boolean useDirectBuffer;
    private Integer bindingPort;
    private final Thread monitorThread;

    /* loaded from: input_file:io/gridgo/socket/impl/DefaultSocketConsumer$SocketConsumerBuidler.class */
    public static class SocketConsumerBuidler {
        private ConnectorContext context;
        private SocketFactory factory;
        private SocketOptions options;
        private String address;
        private int bufferSize;
        private Boolean useDirectBuffer;
        private Boolean monitorEnabled;

        SocketConsumerBuidler() {
        }

        public SocketConsumerBuidler context(ConnectorContext connectorContext) {
            this.context = connectorContext;
            return this;
        }

        public SocketConsumerBuidler factory(SocketFactory socketFactory) {
            this.factory = socketFactory;
            return this;
        }

        public SocketConsumerBuidler options(SocketOptions socketOptions) {
            this.options = socketOptions;
            return this;
        }

        public SocketConsumerBuidler address(String str) {
            this.address = str;
            return this;
        }

        public SocketConsumerBuidler bufferSize(int i) {
            this.bufferSize = i;
            return this;
        }

        public SocketConsumerBuidler useDirectBuffer(Boolean bool) {
            this.useDirectBuffer = bool;
            return this;
        }

        public SocketConsumerBuidler monitorEnabled(Boolean bool) {
            this.monitorEnabled = bool;
            return this;
        }

        public DefaultSocketConsumer build() {
            return new DefaultSocketConsumer(this.context, this.factory, this.options, this.address, this.bufferSize, this.useDirectBuffer, this.monitorEnabled);
        }

        public String toString() {
            return "DefaultSocketConsumer.SocketConsumerBuidler(context=" + this.context + ", factory=" + this.factory + ", options=" + this.options + ", address=" + this.address + ", bufferSize=" + this.bufferSize + ", useDirectBuffer=" + this.useDirectBuffer + ", monitorEnabled=" + this.monitorEnabled + ")";
        }
    }

    private DefaultSocketConsumer(ConnectorContext connectorContext, SocketFactory socketFactory, SocketOptions socketOptions, String str, int i, Boolean bool, Boolean bool2) {
        super(connectorContext);
        this.autoSkipTopicHeader = false;
        this.useDirectBuffer = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
        this.factory = socketFactory;
        this.options = socketOptions;
        this.address = str;
        this.bufferSize = i;
        if (bool != null) {
            this.useDirectBuffer = bool.booleanValue();
        }
        if (bool2 == null || !bool2.booleanValue()) {
            this.monitorThread = null;
        } else {
            this.monitorThread = new Thread(this::monitor);
        }
    }

    private void monitor() {
        String generateName = generateName();
        Thread.currentThread().setName(generateName + ".monitor");
        log.info("start monitoring socket consumer: {}", generateName);
        long j = 0;
        DecimalFormat decimalFormat = new DecimalFormat("###,###.##");
        while (!ThreadUtils.isShuttingDown() && ThreadUtils.sleepSilence(1000L)) {
            long j2 = this.totalRecvMessages - j;
            if (j2 > 0) {
                log.debug("total received bytes: {}, total received msg: {} -> pace: {}", new Object[]{decimalFormat.format(this.totalRecvBytes), decimalFormat.format(this.totalRecvMessages), decimalFormat.format(j2)});
                j = this.totalRecvMessages;
            }
        }
    }

    protected String generateName() {
        return "consumer." + getUniqueIdentifier();
    }

    private String getUniqueIdentifier() {
        return this.factory.getType() + "." + this.options.getType() + "." + this.address;
    }

    private Socket initSocket() {
        Socket createSocket = this.factory.createSocket(this.options);
        if (!this.options.getConfig().containsKey("receiveTimeout")) {
            createSocket.applyConfig("receiveTimeout", 100);
        }
        String lowerCase = this.options.getType().toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case 114240:
                if (lowerCase.equals(SocketConstants.TYPE_SUBSCRIBE)) {
                    z = true;
                    break;
                }
                break;
            case 3433178:
                if (lowerCase.equals("pair")) {
                    z = 2;
                    break;
                }
                break;
            case 3452485:
                if (lowerCase.equals(SocketConstants.TYPE_PULL)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                createSocket.bind(this.address);
                this.bindingPort = createSocket.getBindingPort();
                break;
            case true:
                createSocket.connect(this.address);
                createSocket.subscribe((String) this.options.getConfig().getOrDefault("topic", ""));
                this.autoSkipTopicHeader = true;
                break;
            case true:
                createSocket.bind(this.address);
                this.bindingPort = createSocket.getBindingPort();
                int i = 0;
                boolean parseBoolean = Boolean.parseBoolean((String) this.options.getConfig().get(SocketConstants.BATCHING_ENABLED));
                if (parseBoolean) {
                    i = Integer.valueOf((String) this.options.getConfig().getOrDefault("maxBatchingSize", Integer.valueOf(SocketConnector.DEFAULT_MAX_BATCH_SIZE))).intValue();
                }
                setResponder(DefaultSocketResponder.builder().context(getContext()).socket(createSocket).bufferSize(this.bufferSize).ringBufferSize(2048).batchingEnabled(parseBoolean).maxBatchSize(i).uniqueIdentifier(getUniqueIdentifier()).useDirectBuffer(Boolean.valueOf(this.useDirectBuffer)).build());
                break;
        }
        return createSocket;
    }

    protected void onStart() {
        Socket initSocket = initSocket();
        this.totalRecvBytes = 0L;
        this.totalRecvMessages = 0L;
        this.doneSignal = new CountDownLatch(1);
        this.poller = new Thread(() -> {
            ByteBuffer allocateDirect = (initSocket.forceUsingDirectBuffer() || this.useDirectBuffer) ? ByteBuffer.allocateDirect(this.bufferSize) : ByteBuffer.allocate(this.bufferSize);
            log.debug("****** Using {} byte buffer", this.useDirectBuffer ? "direct" : "heap");
            SocketUtils.startPolling(initSocket, allocateDirect, this.autoSkipTopicHeader, this::handleSocketMessage, (v1) -> {
                increaseTotalRecvBytes(v1);
            }, (v1) -> {
                increaseTotalRecvMsgs(v1);
            }, getContext().getExceptionHandler());
            initSocket.close();
            this.doneSignal.countDown();
        }, getName() + ".poller");
        this.poller.start();
        if (this.monitorThread != null) {
            this.monitorThread.start();
        }
    }

    private void handleSocketMessage(Message message) {
        ensurePayloadId(message);
        publish(message, null);
    }

    private void increaseTotalRecvBytes(long j) {
        this.totalRecvBytes += j;
    }

    private void increaseTotalRecvMsgs(long j) {
        this.totalRecvMessages += j;
    }

    protected final void onStop() {
        if (this.monitorThread != null) {
            this.monitorThread.interrupt();
        }
        if (this.poller == null || this.poller.isInterrupted()) {
            return;
        }
        this.poller.interrupt();
        this.poller = null;
        try {
            this.doneSignal.await();
        } catch (InterruptedException e) {
            log.error("Error while waiting for socket to be closed", e);
        } finally {
            this.doneSignal = null;
        }
        this.bindingPort = null;
    }

    public static SocketConsumerBuidler builder() {
        return new SocketConsumerBuidler();
    }

    @Override // io.gridgo.socket.SocketConsumer
    public long getTotalRecvBytes() {
        return this.totalRecvBytes;
    }

    @Override // io.gridgo.socket.SocketConsumer
    public long getTotalRecvMessages() {
        return this.totalRecvMessages;
    }

    @Override // io.gridgo.socket.HasBindingPort
    public Integer getBindingPort() {
        return this.bindingPort;
    }
}
