package io.questdb.cutlass.line.tcp;

import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.IODispatcher;
import io.questdb.network.IORequestProcessor;
import io.questdb.std.DirectByteCharSequenceObjHashMap;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectByteCharSequence;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpNetworkIOJob.class */
public class LineTcpNetworkIOJob implements NetworkIOJob {
    private static final Log LOG;
    private final IODispatcher<LineTcpConnectionContext> dispatcher;
    private final long maintenanceInterval;
    private final MillisecondClock millisecondClock;
    private final LineTcpMeasurementScheduler scheduler;
    private final int workerId;
    private long maintenanceJobDeadline;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final DirectByteCharSequenceObjHashMap<TableUpdateDetails> tableUpdateDetailsUtf8 = new DirectByteCharSequenceObjHashMap<>();
    private final ObjList<SymbolCache> unusedSymbolCaches = new ObjList<>();
    private LineTcpConnectionContext busyContext = null;
    private final IORequestProcessor<LineTcpConnectionContext> onRequest = this::onRequest;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LineTcpNetworkIOJob(LineTcpReceiverConfiguration lineTcpReceiverConfiguration, LineTcpMeasurementScheduler lineTcpMeasurementScheduler, IODispatcher<LineTcpConnectionContext> iODispatcher, int i) {
        this.millisecondClock = lineTcpReceiverConfiguration.getMillisecondClock();
        this.maintenanceInterval = lineTcpReceiverConfiguration.getMaintenanceInterval();
        this.scheduler = lineTcpMeasurementScheduler;
        this.maintenanceJobDeadline = this.millisecondClock.getTicks() + this.maintenanceInterval;
        this.dispatcher = iODispatcher;
        this.workerId = i;
    }

    @Override // io.questdb.cutlass.line.tcp.NetworkIOJob
    public void addTableUpdateDetails(String str, TableUpdateDetails tableUpdateDetails) {
        this.tableUpdateDetailsUtf8.put(str, tableUpdateDetails);
        tableUpdateDetails.addReference(this.workerId);
    }

    @Override // io.questdb.cutlass.line.tcp.NetworkIOJob, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.busyContext != null) {
            this.busyContext.getDispatcher().disconnect(this.busyContext, 6);
            this.busyContext = null;
        }
        Misc.freeObjList(this.unusedSymbolCaches);
    }

    @Override // io.questdb.cutlass.line.tcp.NetworkIOJob
    public TableUpdateDetails getLocalTableDetails(DirectByteCharSequence directByteCharSequence) {
        return this.tableUpdateDetailsUtf8.get(directByteCharSequence);
    }

    @Override // io.questdb.cutlass.line.tcp.NetworkIOJob
    public ObjList<SymbolCache> getUnusedSymbolCaches() {
        return this.unusedSymbolCaches;
    }

    @Override // io.questdb.cutlass.line.tcp.NetworkIOJob
    public int getWorkerId() {
        return this.workerId;
    }

    @Override // io.questdb.mp.Job
    public boolean run(int i) {
        if (!$assertionsDisabled && this.workerId != i) {
            throw new AssertionError();
        }
        boolean z = false;
        if (this.busyContext != null) {
            if (handleIO(this.busyContext)) {
                return true;
            }
            LOG.debug().$((CharSequence) "context is no longer waiting on a full queue [fd=").$(this.busyContext.getFd()).$(']').$();
            this.busyContext = null;
            z = true;
        }
        if (this.dispatcher.processIOQueue(this.onRequest)) {
            z = true;
        }
        long ticks = this.millisecondClock.getTicks();
        if (ticks > this.maintenanceJobDeadline) {
            z = this.scheduler.doMaintenance(this.tableUpdateDetailsUtf8, i, ticks);
            if (!z) {
                this.maintenanceJobDeadline = ticks + this.maintenanceInterval;
            }
        }
        return z;
    }

    private boolean handleIO(LineTcpConnectionContext lineTcpConnectionContext) {
        if (lineTcpConnectionContext.invalid()) {
            return false;
        }
        switch (lineTcpConnectionContext.handleIO(this)) {
            case NEEDS_READ:
                lineTcpConnectionContext.getDispatcher().registerChannel(lineTcpConnectionContext, 1);
                return false;
            case NEEDS_WRITE:
                lineTcpConnectionContext.getDispatcher().registerChannel(lineTcpConnectionContext, 4);
                return false;
            case QUEUE_FULL:
                return true;
            case NEEDS_DISCONNECT:
                lineTcpConnectionContext.getDispatcher().disconnect(lineTcpConnectionContext, 0);
                return false;
            default:
                return false;
        }
    }

    private void onRequest(int i, LineTcpConnectionContext lineTcpConnectionContext) {
        if (handleIO(lineTcpConnectionContext)) {
            this.busyContext = lineTcpConnectionContext;
            LOG.debug().$((CharSequence) "context is waiting on a full queue [fd=").$(lineTcpConnectionContext.getFd()).$(']').$();
        }
    }

    static {
        $assertionsDisabled = !LineTcpNetworkIOJob.class.desiredAssertionStatus();
        LOG = LogFactory.getLog((Class<?>) LineTcpNetworkIOJob.class);
    }
}
