package io.gridgo.socket.impl;

import io.gridgo.connector.Receiver;
import io.gridgo.connector.impl.AbstractConsumer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.connector.support.exceptions.FailureHandlerAware;
import io.gridgo.framework.support.Message;
import io.gridgo.socket.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gridgo/socket/impl/DefaultSocketReceiver.class */
public class DefaultSocketReceiver extends AbstractConsumer implements Receiver, FailureHandlerAware<DefaultSocketReceiver> {
    private static final Logger log = LoggerFactory.getLogger(DefaultSocketReceiver.class);
    private Thread poller;
    private final Socket socket;
    private final int bufferSize;
    private CountDownLatch doneSignal;
    private final String uniqueIdentifier;
    private long totalRecvBytes;
    private long totalRecvMessages;
    private Function<Throwable, Message> failureHandler;

    public DefaultSocketReceiver(ConnectorContext connectorContext, Socket socket, int i, String str) {
        super(connectorContext);
        this.socket = socket;
        this.bufferSize = i;
        this.uniqueIdentifier = str;
        setFailureHandler(connectorContext.getExceptionHandler());
    }

    public DefaultSocketReceiver setFailureHandler(Function<Throwable, Message> function) {
        this.failureHandler = function;
        return this;
    }

    protected String generateName() {
        return "receiver." + this.uniqueIdentifier;
    }

    protected void onStart() {
        this.totalRecvBytes = 0L;
        this.totalRecvMessages = 0L;
        this.doneSignal = new CountDownLatch(1);
        this.poller = new Thread(() -> {
            SocketUtils.startPolling(this.socket, ByteBuffer.allocateDirect(this.bufferSize), false, this::handleSocketMessage, (v1) -> {
                increaseTotalRecvBytes(v1);
            }, (v1) -> {
                increaseTotalRecvMsgs(v1);
            }, getContext().getExceptionHandler());
            System.out.println("Closing socket: " + this.socket.getEndpoint());
            this.socket.close();
            this.doneSignal.countDown();
        }, this.socket.getEndpoint().getAddress() + " POLLER");
        this.poller.start();
    }

    protected void onStop() {
        this.poller.interrupt();
        this.poller = null;
        try {
            this.doneSignal.await();
            this.doneSignal = null;
        } catch (InterruptedException e) {
            log.error("Error while await for socket to close", e);
        }
    }

    private void handleSocketMessage(Message message) {
        ensurePayloadId(message);
        publish(message, null);
    }

    private void increaseTotalRecvBytes(long j) {
        this.totalRecvBytes += j;
    }

    private void increaseTotalRecvMsgs(long j) {
        this.totalRecvMessages += j;
    }

    public long getTotalRecvBytes() {
        return this.totalRecvBytes;
    }

    public long getTotalRecvMessages() {
        return this.totalRecvMessages;
    }

    protected Function<Throwable, Message> getFailureHandler() {
        return this.failureHandler;
    }

    /* renamed from: setFailureHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m5setFailureHandler(Function function) {
        return setFailureHandler((Function<Throwable, Message>) function);
    }
}
