package io.gridgo.socket.impl;

import io.gridgo.connector.impl.AbstractHasResponderConsumer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.socket.Socket;
import io.gridgo.socket.SocketConnector;
import io.gridgo.socket.SocketConstants;
import io.gridgo.socket.SocketConsumer;
import io.gridgo.socket.SocketFactory;
import io.gridgo.socket.SocketOptions;
import io.gridgo.utils.ThreadUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/* loaded from: input_file:io/gridgo/socket/impl/DefaultSocketConsumer.class */
public class DefaultSocketConsumer extends AbstractHasResponderConsumer implements SocketConsumer {
    private long totalRecvBytes;
    private long totalRecvMessages;
    private Thread poller;
    private final int bufferSize;
    private final SocketFactory factory;
    private final SocketOptions options;
    private final String address;
    private CountDownLatch stopDoneTrigger;
    private boolean autoSkipTopicHeader;

    public DefaultSocketConsumer(ConnectorContext connectorContext, SocketFactory socketFactory, SocketOptions socketOptions, String str, int i) {
        super(connectorContext);
        this.autoSkipTopicHeader = false;
        this.factory = socketFactory;
        this.options = socketOptions;
        this.address = str;
        this.bufferSize = i;
    }

    protected final void onStop() {
        if (this.poller == null || this.poller.isInterrupted()) {
            return;
        }
        this.poller.interrupt();
        this.poller = null;
        try {
            this.stopDoneTrigger.await();
            this.stopDoneTrigger = null;
        } catch (InterruptedException e) {
            throw new RuntimeException("error while waiting for stopped", e);
        }
    }

    private void poll(Socket socket, Consumer<CountDownLatch> consumer) {
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(this.bufferSize);
        Thread.currentThread().setName("[POLLER] " + getName());
        SocketUtils.startPolling(socket, allocateDirect, this.autoSkipTopicHeader, message -> {
            ensurePayloadId(message);
            publish(message, null);
        }, num -> {
            this.totalRecvBytes += num.intValue();
        }, num2 -> {
            this.totalRecvMessages += num2.intValue();
        }, getContext().getExceptionHandler(), consumer);
        socket.close();
        this.poller = null;
    }

    private Socket initSocket() {
        Socket createSocket = this.factory.createSocket(this.options);
        if (!this.options.getConfig().containsKey("receiveTimeout")) {
            createSocket.applyConfig("receiveTimeout", 100);
        }
        String lowerCase = this.options.getType().toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case 114240:
                if (lowerCase.equals(SocketConstants.TYPE_SUBSCRIBE)) {
                    z = true;
                    break;
                }
                break;
            case 3433178:
                if (lowerCase.equals("pair")) {
                    z = 2;
                    break;
                }
                break;
            case 3452485:
                if (lowerCase.equals(SocketConstants.TYPE_PULL)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                createSocket.bind(this.address);
                break;
            case true:
                createSocket.connect(this.address);
                createSocket.subscribe((String) this.options.getConfig().getOrDefault("topic", ""));
                this.autoSkipTopicHeader = true;
                break;
            case true:
                createSocket.bind(this.address);
                int i = 0;
                boolean parseBoolean = Boolean.parseBoolean((String) this.options.getConfig().get(SocketConstants.BATCHING_ENABLED));
                if (parseBoolean) {
                    i = Integer.valueOf((String) this.options.getConfig().getOrDefault("maxBatchingSize", Integer.valueOf(SocketConnector.DEFAULT_MAX_BATCH_SIZE))).intValue();
                }
                setResponder(new DefaultSocketResponder(getContext(), createSocket, this.bufferSize, SocketConnector.DEFAULT_RINGBUFFER_SIZE, parseBoolean, i, getUniqueIdentifier()));
                break;
        }
        return createSocket;
    }

    protected void onStart() {
        Socket initSocket = initSocket();
        AtomicReference atomicReference = new AtomicReference();
        this.poller = new Thread(() -> {
            poll(initSocket, countDownLatch -> {
                atomicReference.set(countDownLatch);
            });
        });
        this.totalRecvBytes = 0L;
        this.totalRecvMessages = 0L;
        this.poller.start();
        ThreadUtils.sleep(100L);
        ThreadUtils.busySpin(10L, () -> {
            return Boolean.valueOf(atomicReference.get() == null);
        });
        this.stopDoneTrigger = (CountDownLatch) atomicReference.get();
    }

    protected String generateName() {
        return "consumer." + getUniqueIdentifier();
    }

    private String getUniqueIdentifier() {
        return this.factory.getType() + "." + this.options.getType() + "." + this.address;
    }

    @Override // io.gridgo.socket.SocketConsumer
    public long getTotalRecvBytes() {
        return this.totalRecvBytes;
    }

    @Override // io.gridgo.socket.SocketConsumer
    public long getTotalRecvMessages() {
        return this.totalRecvMessages;
    }
}
