package io.questdb.cutlass.line.udp;

import io.questdb.cairo.CairoEngine;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.SOCountDownLatch;
import io.questdb.mp.SynchronizedJob;
import io.questdb.mp.WorkerPool;
import io.questdb.network.NetworkError;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Misc;
import io.questdb.std.Os;
import io.questdb.std.str.Path;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/questdb/cutlass/line/udp/AbstractLineProtoUdpReceiver.class */
public abstract class AbstractLineProtoUdpReceiver extends SynchronizedJob implements Closeable {
    private static final Log LOG = LogFactory.getLog((Class<?>) AbstractLineProtoUdpReceiver.class);
    protected final int commitMode;
    protected final LineUdpLexer lexer;
    protected final NetworkFacade nf;
    protected final LineUdpParserImpl parser;
    private final LineUdpReceiverConfiguration configuration;
    protected int commitRate;
    protected long fd;
    private final SOCountDownLatch halted = new SOCountDownLatch(1);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final SOCountDownLatch started = new SOCountDownLatch(1);
    protected long totalCount = 0;

    public AbstractLineProtoUdpReceiver(LineUdpReceiverConfiguration lineUdpReceiverConfiguration, CairoEngine cairoEngine, WorkerPool workerPool) {
        this.configuration = lineUdpReceiverConfiguration;
        this.commitMode = lineUdpReceiverConfiguration.getCommitMode();
        this.nf = lineUdpReceiverConfiguration.getNetworkFacade();
        this.fd = this.nf.socketUdp();
        if (this.fd < 0) {
            int errno = this.nf.errno();
            LOG.error().$((CharSequence) "cannot open UDP socket [errno=").$(errno).$(']').$();
            throw NetworkError.instance(errno, "Cannot open UDP socket");
        }
        try {
            bind(lineUdpReceiverConfiguration);
            this.commitRate = lineUdpReceiverConfiguration.getCommitRate();
            if (lineUdpReceiverConfiguration.getReceiveBufferSize() != -1 && this.nf.setRcvBuf(this.fd, lineUdpReceiverConfiguration.getReceiveBufferSize()) != 0) {
                LOG.error().$((CharSequence) "could not set receive buffer size [fd=").$(this.fd).$((CharSequence) ", size=").$(lineUdpReceiverConfiguration.getReceiveBufferSize()).$((CharSequence) ", errno=").$(lineUdpReceiverConfiguration.getNetworkFacade().errno()).I$();
            }
            this.lexer = new LineUdpLexer(lineUdpReceiverConfiguration.getMsgBufferSize());
            this.parser = new LineUdpParserImpl(cairoEngine, lineUdpReceiverConfiguration);
            this.lexer.withParser(this.parser);
            if (!lineUdpReceiverConfiguration.ownThread()) {
                workerPool.assign(this);
                logStarted(lineUdpReceiverConfiguration);
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.fd > -1) {
            if (this.running.compareAndSet(true, false)) {
                this.started.await();
                this.halted.await();
            }
            if (this.nf.close(this.fd) != 0) {
                LOG.error().$((CharSequence) "could not close [fd=").$(this.fd).$((CharSequence) ", errno=").$(this.nf.errno()).$(']').$();
            } else {
                LOG.info().$((CharSequence) "closed [fd=").$(this.fd).$(']').$();
            }
            if (this.parser != null) {
                this.parser.commitAll(this.commitMode);
                this.parser.close();
            }
            Misc.free(this.lexer);
            LOG.info().$((CharSequence) "closed [fd=").$(this.fd).$(']').$();
            this.fd = -1L;
        }
    }

    public void start() {
        if (this.configuration.ownThread() && this.running.compareAndSet(false, true)) {
            new Thread(() -> {
                this.started.countDown();
                if (this.configuration.ownThreadAffinity() != -1) {
                    Os.setCurrentThreadAffinity(this.configuration.ownThreadAffinity());
                }
                logStarted(this.configuration);
                while (this.running.get()) {
                    runSerially();
                }
                LOG.info().$((CharSequence) "shutdown").$();
                Path.clearThreadLocals();
                this.halted.countDown();
            }).start();
        }
    }

    private void bind(LineUdpReceiverConfiguration lineUdpReceiverConfiguration) {
        if (!this.nf.bindUdp(this.fd, lineUdpReceiverConfiguration.isUnicast() ? lineUdpReceiverConfiguration.getBindIPv4Address() : 0, lineUdpReceiverConfiguration.getPort())) {
            throw NetworkError.instance(this.nf.errno()).couldNotBindSocket("udp-line-server", lineUdpReceiverConfiguration.getBindIPv4Address(), lineUdpReceiverConfiguration.getPort());
        }
        if (!lineUdpReceiverConfiguration.isUnicast() && !this.nf.join(this.fd, lineUdpReceiverConfiguration.getBindIPv4Address(), lineUdpReceiverConfiguration.getGroupIPv4Address())) {
            throw NetworkError.instance(this.nf.errno()).put("cannot join group ").put("[fd=").put(this.fd).put(", bind=").ip(lineUdpReceiverConfiguration.getBindIPv4Address()).put(", group=").ip(lineUdpReceiverConfiguration.getGroupIPv4Address()).put(']');
        }
    }

    private void logStarted(LineUdpReceiverConfiguration lineUdpReceiverConfiguration) {
        if (lineUdpReceiverConfiguration.isUnicast()) {
            LOG.info().$((CharSequence) "receiving unicast on ").$ip(lineUdpReceiverConfiguration.getBindIPv4Address()).$(':').$(lineUdpReceiverConfiguration.getPort()).$((CharSequence) " [fd=").$(this.fd).$((CharSequence) ", commitRate=").$(this.commitRate).I$();
        } else {
            LOG.info().$((CharSequence) "receiving multicast from ").$ip(lineUdpReceiverConfiguration.getGroupIPv4Address()).$(':').$(lineUdpReceiverConfiguration.getPort()).$((CharSequence) " via ").$ip(lineUdpReceiverConfiguration.getBindIPv4Address()).$((CharSequence) " [fd=").$(this.fd).$((CharSequence) ", commitRate=").$(this.commitRate).I$();
        }
    }
}
