package io.gridgo.socket.impl;

import io.gridgo.bean.BArray;
import io.gridgo.bean.BElement;
import io.gridgo.bean.BObject;
import io.gridgo.framework.support.Message;
import io.gridgo.framework.support.Payload;
import io.gridgo.socket.Socket;
import io.gridgo.socket.SocketConstants;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gridgo/socket/impl/SocketUtils.class */
public class SocketUtils {
    private static final Logger log = LoggerFactory.getLogger(SocketUtils.class);

    public static Message accumulateBatch(@NonNull Collection<Message> collection) {
        if (collection == null) {
            throw new NullPointerException("messages is marked @NonNull but is null");
        }
        if (collection.size() == 1) {
            return collection.iterator().next();
        }
        BArray ofEmpty = BArray.ofEmpty();
        Iterator<Message> it = collection.iterator();
        while (it.hasNext()) {
            Payload payload = it.next().getPayload();
            ofEmpty.add(BArray.ofSequence(new Object[]{payload.getId().orElse(null), payload.getHeaders(), payload.getBody()}));
        }
        return Message.of(Payload.of(ofEmpty).addHeader(SocketConstants.IS_BATCH, true).addHeader(SocketConstants.BATCH_SIZE, Integer.valueOf(collection.size())));
    }

    private static void process(ByteBuffer byteBuffer, boolean z, Consumer<Message> consumer, Consumer<Integer> consumer2, Consumer<Integer> consumer3, Consumer<Throwable> consumer4, int i) {
        consumer2.accept(Integer.valueOf(i));
        try {
            byteBuffer.flip();
            if (z) {
                byte b = byteBuffer.get();
                while (b != 0) {
                    b = byteBuffer.get();
                }
            }
            Message parse = Message.parse(byteBuffer);
            BObject headers = parse.headers();
            if (headers == null || !headers.getBoolean(SocketConstants.IS_BATCH, false).booleanValue()) {
                consumer3.accept(1);
                consumer.accept(parse);
            } else {
                processBatch(consumer, consumer3, parse, headers);
            }
        } catch (Exception e) {
            if (consumer4 != null) {
                consumer4.accept(e);
            } else {
                log.error("Error while parse buffer to message", e);
            }
        }
    }

    private static void processBatch(Consumer<Message> consumer, Consumer<Integer> consumer2, Message message, BObject bObject) {
        BArray asArray = message.body().asArray();
        consumer2.accept(bObject.getInteger(SocketConstants.BATCH_SIZE, Integer.valueOf(asArray.size())));
        Iterator it = asArray.iterator();
        while (it.hasNext()) {
            consumer.accept(Message.parse((BElement) it.next()));
        }
    }

    public static void startPolling(Socket socket, ByteBuffer byteBuffer, boolean z, Consumer<Message> consumer, Consumer<Integer> consumer2, Consumer<Integer> consumer3, Consumer<Throwable> consumer4, Consumer<CountDownLatch> consumer5) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        if (consumer5 != null) {
            consumer5.accept(countDownLatch);
        }
        while (!Thread.currentThread().isInterrupted()) {
            byteBuffer.clear();
            int receive = socket.receive(byteBuffer);
            if (receive >= 0) {
                process(byteBuffer, z, consumer, consumer2, consumer3, consumer4, receive);
            } else if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }
        countDownLatch.countDown();
    }
}
