package org.nd4j.parameterserver.distributed.transport;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.Header;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.NonNull;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.SleepingIdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.nd4j.linalg.exception.ND4JIllegalStateException;
import org.nd4j.parameterserver.distributed.conf.VoidConfiguration;
import org.nd4j.parameterserver.distributed.enums.NodeRole;
import org.nd4j.parameterserver.distributed.logic.completion.Clipboard;
import org.nd4j.parameterserver.distributed.messages.Frame;
import org.nd4j.parameterserver.distributed.messages.MeaningfulMessage;
import org.nd4j.parameterserver.distributed.messages.VoidMessage;
import org.nd4j.parameterserver.distributed.transport.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/nd4j/parameterserver/distributed/transport/BaseTransport.class */
public abstract class BaseTransport implements Transport {
    private static final Logger log = LoggerFactory.getLogger(BaseTransport.class);
    protected VoidConfiguration voidConfiguration;
    protected NodeRole nodeRole;
    protected Aeron aeron;
    protected Aeron.Context context;
    protected String unicastChannelUri;
    protected String ip;
    protected MediaDriver driver;
    protected Publication publicationForShards;
    protected Publication publicationForClients;
    protected Subscription subscriptionForShards;
    protected Subscription subscriptionForClients;
    protected FragmentAssembler messageHandlerForShards;
    protected FragmentAssembler messageHandlerForClients;
    protected Thread threadA;
    protected Thread threadB;
    protected Clipboard clipboard;
    protected int port = 0;
    protected LinkedBlockingQueue<VoidMessage> messages = new LinkedBlockingQueue<>();
    protected Map<Long, MeaningfulMessage> completed = new ConcurrentHashMap();
    protected AtomicBoolean runner = new AtomicBoolean(true);
    protected AtomicLong frameCount = new AtomicLong(0);
    protected IdleStrategy idler = new SleepingIdleStrategy(1000);
    protected IdleStrategy feedbackIdler = new SleepingIdleStrategy(100000);
    protected Transport.ThreadingModel threadingModel = Transport.ThreadingModel.DEDICATED_THREADS;
    protected short targetIndex = 0;
    protected short shardIndex = 0;

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public MeaningfulMessage sendMessageAndGetResponse(@NonNull VoidMessage voidMessage) {
        if (voidMessage == null) {
            throw new NullPointerException("message");
        }
        long currentTimeMillis = System.currentTimeMillis();
        long taskId = voidMessage.getTaskId();
        sendCommandToShard(voidMessage);
        new AtomicLong(0L);
        long currentTimeMillis2 = System.currentTimeMillis();
        do {
            MeaningfulMessage meaningfulMessage = this.completed.get(Long.valueOf(taskId));
            if (meaningfulMessage != null) {
                this.completed.remove(Long.valueOf(taskId));
                long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                if ((voidMessage instanceof Frame) && this.frameCount.incrementAndGet() % 1000 == 0) {
                    log.info("Frame of {} messages [{}] processed in {} ms", new Object[]{Integer.valueOf(((Frame) voidMessage).size()), Long.valueOf(voidMessage.getTaskId()), Long.valueOf(currentTimeMillis3)});
                }
                return meaningfulMessage;
            }
            try {
                this.feedbackIdler.idle();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } while (System.currentTimeMillis() - currentTimeMillis2 <= this.voidConfiguration.getResponseTimeout());
        log.info("Resending request for taskId [{}]", Long.valueOf(taskId));
        voidMessage.incrementRetransmitCount();
        if (voidMessage.getRetransmitCount() > 20) {
            throw new RuntimeException("Giving up on message delivery...");
        }
        return sendMessageAndGetResponse(voidMessage);
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void setIpAndPort(@NonNull String str, int i) {
        if (str == null) {
            throw new NullPointerException("ip");
        }
        this.ip = str;
        this.port = i;
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void sendMessage(@NonNull VoidMessage voidMessage) {
        if (voidMessage == null) {
            throw new NullPointerException("message");
        }
        switch (voidMessage.getMessageType()) {
            case 0:
            case 1:
            case 2:
            case 3:
            case 4:
            case 5:
            case 6:
            case 7:
            case 8:
            case 9:
                if (voidMessage.isBlockingMessage()) {
                    sendMessageAndGetResponse(voidMessage);
                    return;
                } else {
                    sendCommandToShard(voidMessage);
                    return;
                }
            case 10:
            case 11:
            case 12:
            case 13:
            case 19:
                sendFeedbackToClient(voidMessage);
                return;
            case 14:
            case 15:
            case 16:
            case 17:
            case 18:
            case 23:
            case 24:
            case 25:
            case 26:
            case 27:
            default:
                throw new RuntimeException("Unknown messageType passed for delivery");
            case 20:
            case 21:
            case 22:
            case 28:
                sendCoordinationCommand(voidMessage);
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shardMessageHandler(DirectBuffer directBuffer, int i, int i2, Header header) {
        byte[] bArr = new byte[i2];
        directBuffer.getBytes(i, bArr);
        VoidMessage fromBytes = VoidMessage.fromBytes(bArr);
        if (fromBytes.getMessageType() == 7) {
            this.messages.add(fromBytes);
        } else {
            this.publicationForShards.offer(directBuffer, i, i2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalMessageHandler(DirectBuffer directBuffer, int i, int i2, Header header) {
        byte[] bArr = new byte[i2];
        directBuffer.getBytes(i, bArr);
        this.messages.add(VoidMessage.fromBytes(bArr));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clientMessageHandler(DirectBuffer directBuffer, int i, int i2, Header header) {
        byte[] bArr = new byte[i2];
        directBuffer.getBytes(i, bArr);
        MeaningfulMessage meaningfulMessage = (MeaningfulMessage) VoidMessage.fromBytes(bArr);
        this.completed.put(Long.valueOf(meaningfulMessage.getTaskId()), meaningfulMessage);
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void sendMessageToAllShards(VoidMessage voidMessage) {
        if (this.nodeRole != NodeRole.SHARD) {
            throw new RuntimeException("This method shouldn't be called only from Shard context");
        }
        voidMessage.setTargetId((short) -1);
        sendCoordinationCommand(voidMessage);
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void init(VoidConfiguration voidConfiguration, Clipboard clipboard, NodeRole nodeRole, String str, int i, short s) {
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void launch(@NonNull Transport.ThreadingModel threadingModel) {
        if (threadingModel == null) {
            throw new NullPointerException("threading");
        }
        this.threadingModel = threadingModel;
        switch (threadingModel) {
            case SINGLE_THREAD:
                log.warn("SINGLE_THREAD model is used, performance will be significantly reduced");
                this.threadA = new Thread(() -> {
                    while (this.runner.get()) {
                        if (this.subscriptionForShards != null) {
                            this.subscriptionForShards.poll(this.messageHandlerForShards, 512);
                        }
                        this.idler.idle(this.subscriptionForClients.poll(this.messageHandlerForClients, 512));
                    }
                });
                this.threadA.start();
                return;
            case DEDICATED_THREADS:
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                if (this.nodeRole == NodeRole.NONE) {
                    throw new ND4JIllegalStateException("No role is set for current node!");
                }
                if (this.nodeRole == NodeRole.SHARD || this.nodeRole == NodeRole.BACKUP || this.nodeRole == NodeRole.MASTER) {
                    if (this.messageHandlerForShards != null) {
                        this.threadB = new Thread(() -> {
                            while (this.runner.get()) {
                                this.idler.idle(this.subscriptionForShards.poll(this.messageHandlerForShards, 512));
                            }
                        });
                    }
                    this.threadA = new Thread(() -> {
                        atomicBoolean.set(true);
                        while (this.runner.get()) {
                            this.idler.idle(this.subscriptionForClients.poll(this.messageHandlerForClients, 512));
                        }
                    });
                    if (this.threadB != null) {
                        this.threadB.setDaemon(true);
                        this.threadB.setName("VoidParamServer subscription threadB [" + this.nodeRole + "]");
                        this.threadB.start();
                    }
                } else {
                    this.threadA = new Thread(() -> {
                        atomicBoolean.set(true);
                        while (this.runner.get()) {
                            this.idler.idle(this.subscriptionForClients.poll(this.messageHandlerForClients, 512));
                        }
                    });
                }
                this.threadA.setDaemon(true);
                this.threadA.setName("VoidParamServer subscription threadA [" + this.nodeRole + "]");
                this.threadA.start();
                while (!atomicBoolean.get()) {
                    try {
                        Thread.sleep(50L);
                    } catch (Exception e) {
                    }
                }
                return;
            case SAME_THREAD:
                log.warn("SAME_THREAD model is used, performance will be dramatically reduced");
                return;
            default:
                throw new IllegalStateException("Unknown thread model: [" + threadingModel.toString() + "]");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdownSilent() {
        log.info("Shutting down Aeron infrastructure...");
        CloseHelper.quietClose(this.publicationForClients);
        CloseHelper.quietClose(this.publicationForShards);
        CloseHelper.quietClose(this.subscriptionForShards);
        CloseHelper.quietClose(this.subscriptionForClients);
        CloseHelper.quietClose(this.aeron);
        CloseHelper.quietClose(this.context);
        CloseHelper.quietClose(this.driver);
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void shutdown() {
        this.runner.set(false);
        try {
            this.threadA.join();
            if (this.threadB != null) {
                this.threadB.join();
            }
        } catch (Exception e) {
        }
        CloseHelper.quietClose(this.driver);
        try {
            Thread.sleep(500L);
        } catch (Exception e2) {
        }
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void receiveMessage(VoidMessage voidMessage) {
        try {
            log.info("Message received, saving...");
            this.messages.put(voidMessage);
        } catch (Exception e) {
        }
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public VoidMessage takeMessage() {
        if (this.threadingModel != Transport.ThreadingModel.SAME_THREAD) {
            try {
                return this.messages.take();
            } catch (InterruptedException e) {
                return null;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
        if (this.subscriptionForShards != null) {
            this.subscriptionForShards.poll(this.messageHandlerForShards, 512);
        }
        this.subscriptionForClients.poll(this.messageHandlerForClients, 512);
        return this.messages.poll();
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void putMessage(@NonNull VoidMessage voidMessage) {
        if (voidMessage == null) {
            throw new NullPointerException("message");
        }
        this.messages.add(voidMessage);
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public VoidMessage peekMessage() {
        return this.messages.peek();
    }

    protected synchronized void sendCommandToShard(VoidMessage voidMessage) {
        if (this.nodeRole == NodeRole.SHARD) {
            voidMessage.setTargetId(this.shardIndex);
            this.messages.add(voidMessage);
            return;
        }
        voidMessage.setTargetId(this.targetIndex);
        UnsafeBuffer asUnsafeBuffer = voidMessage.asUnsafeBuffer();
        long offer = this.publicationForShards.offer(asUnsafeBuffer);
        if (offer < 0) {
            for (int i = 0; i < 5 && offer < 0; i++) {
                try {
                    Thread.sleep(1000L);
                } catch (Exception e) {
                }
                offer = this.publicationForShards.offer(asUnsafeBuffer);
            }
        }
        if (offer < 0) {
            throw new RuntimeException("Unable to send message over the wire. Error code: " + offer);
        }
    }

    protected abstract void sendCoordinationCommand(VoidMessage voidMessage);

    protected abstract void sendFeedbackToClient(VoidMessage voidMessage);

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public void addClient(String str, int i) {
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public String getIp() {
        return this.ip;
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public int getPort() {
        return this.port;
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public short getTargetIndex() {
        return this.targetIndex;
    }

    @Override // org.nd4j.parameterserver.distributed.transport.Transport
    public short getShardIndex() {
        return this.shardIndex;
    }
}
