package io.questdb.cutlass.line.tcp;

import io.questdb.Metrics;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.AbstractMutableIOContext;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Unsafe;
import io.questdb.std.Vect;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectByteCharSequence;

/* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpConnectionContext.class */
class LineTcpConnectionContext extends AbstractMutableIOContext<LineTcpConnectionContext> {
    private static final Log LOG;
    private static final long QUEUE_FULL_LOG_HYSTERESIS_IN_MS = 10000;
    protected final NetworkFacade nf;
    private final boolean disconnectOnError;
    private final Metrics metrics;
    private final MillisecondClock milliClock;
    private final LineTcpParser parser;
    private final LineTcpMeasurementScheduler scheduler;
    protected boolean peerDisconnected;
    protected long recvBufEnd;
    protected long recvBufPos;
    protected long recvBufStart;
    protected long recvBufStartOfMeasurement;
    private boolean goodMeasurement;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final DirectByteCharSequence byteCharSequence = new DirectByteCharSequence();
    private long lastQueueFullLogMillis = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpConnectionContext$IOContextResult.class */
    public enum IOContextResult {
        NEEDS_READ,
        NEEDS_WRITE,
        QUEUE_FULL,
        NEEDS_DISCONNECT
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LineTcpConnectionContext(LineTcpReceiverConfiguration lineTcpReceiverConfiguration, LineTcpMeasurementScheduler lineTcpMeasurementScheduler, Metrics metrics) {
        this.nf = lineTcpReceiverConfiguration.getNetworkFacade();
        this.disconnectOnError = lineTcpReceiverConfiguration.getDisconnectOnError();
        this.scheduler = lineTcpMeasurementScheduler;
        this.metrics = metrics;
        this.milliClock = lineTcpReceiverConfiguration.getMillisecondClock();
        this.parser = new LineTcpParser(lineTcpReceiverConfiguration.isStringAsTagSupported(), lineTcpReceiverConfiguration.isSymbolAsFieldSupported());
        this.recvBufStart = Unsafe.malloc(lineTcpReceiverConfiguration.getNetMsgBufferSize(), 46);
        this.recvBufEnd = this.recvBufStart + lineTcpReceiverConfiguration.getNetMsgBufferSize();
        clear();
    }

    @Override // io.questdb.network.AbstractMutableIOContext, io.questdb.std.Mutable
    public void clear() {
        this.recvBufPos = this.recvBufStart;
        this.peerDisconnected = false;
        resetParser();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v2, types: [io.questdb.cutlass.line.tcp.LineTcpConnectionContext, long] */
    @Override // io.questdb.network.IOContext, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.fd = -1L;
        ?? free = Unsafe.free(this.recvBufStart, this.recvBufEnd - this.recvBufStart, 46);
        this.recvBufPos = free;
        this.recvBufEnd = free;
        free.recvBufStart = this;
    }

    private boolean checkQueueFullLogHysteresis() {
        long ticks = this.milliClock.getTicks();
        if (ticks - this.lastQueueFullLogMillis < QUEUE_FULL_LOG_HYSTERESIS_IN_MS) {
            return false;
        }
        this.lastQueueFullLogMillis = ticks;
        return true;
    }

    private void doHandleDisconnectEvent() {
        if (this.parser.getBufferAddress() == this.recvBufEnd) {
            LOG.error().$('[').$(this.fd).$((CharSequence) "] buffer overflow [line.tcp.msg.buffer.size=").$(this.recvBufEnd - this.recvBufStart).$(']').$();
        } else if (this.peerDisconnected) {
            if (this.recvBufPos != this.recvBufStart) {
                LOG.info().$('[').$(this.fd).$((CharSequence) "] peer disconnected with partial measurement, ").$(this.recvBufPos - this.recvBufStart).$((CharSequence) " unprocessed bytes").$();
            } else {
                LOG.info().$('[').$(this.fd).$((CharSequence) "] peer disconnected").$();
            }
        }
    }

    private void logParseError() {
        int bufferAddress = (int) (this.parser.getBufferAddress() - this.recvBufStartOfMeasurement);
        if (!$assertionsDisabled && bufferAddress < 0) {
            throw new AssertionError();
        }
        LOG.error().$('[').$(this.fd).$((CharSequence) "] could not parse measurement, ").$(this.parser.getErrorCode()).$((CharSequence) " at ").$(bufferAddress).$((CharSequence) ", line (may be mangled due to partial parsing): '").$((CharSequence) this.byteCharSequence.of(this.recvBufStartOfMeasurement, this.parser.getBufferAddress())).$((CharSequence) "'").$();
    }

    private void startNewMeasurement() {
        this.parser.startNextMeasurement();
        this.recvBufStartOfMeasurement = this.parser.getBufferAddress();
        if (this.recvBufStartOfMeasurement == this.recvBufPos) {
            this.recvBufPos = this.recvBufStart;
            this.parser.of(this.recvBufStart);
            this.recvBufStartOfMeasurement = this.recvBufStart;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean compactBuffer(long j) {
        if (!$assertionsDisabled && j > this.recvBufPos) {
            throw new AssertionError();
        }
        if (j <= this.recvBufStart) {
            return false;
        }
        long j2 = this.recvBufPos - j;
        if (j2 > 0) {
            Vect.memmove(this.recvBufStart, j, j2);
            long j3 = j - this.recvBufStart;
            this.parser.shl(j3);
            this.recvBufStartOfMeasurement -= j3;
        } else {
            if (!$assertionsDisabled && j2 != 0) {
                throw new AssertionError();
            }
            resetParser();
        }
        this.recvBufPos = this.recvBufStart + j2;
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IOContextResult handleIO(NetworkIOJob networkIOJob) {
        read();
        return parseMeasurements(networkIOJob);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x0000, code lost:
    
        continue;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final io.questdb.cutlass.line.tcp.LineTcpConnectionContext.IOContextResult parseMeasurements(io.questdb.cutlass.line.tcp.NetworkIOJob r6) {
        /*
            Method dump skipped, instructions count: 412
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.questdb.cutlass.line.tcp.LineTcpConnectionContext.parseMeasurements(io.questdb.cutlass.line.tcp.NetworkIOJob):io.questdb.cutlass.line.tcp.LineTcpConnectionContext$IOContextResult");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean read() {
        int i = (int) (this.recvBufEnd - this.recvBufPos);
        if (i <= 0 || this.peerDisconnected) {
            return !this.peerDisconnected;
        }
        int recv = this.nf.recv(this.fd, this.recvBufPos, i);
        if (recv > 0) {
            this.recvBufPos += recv;
            i -= recv;
        } else {
            this.peerDisconnected = recv < 0;
        }
        return i < i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetParser() {
        this.parser.of(this.recvBufStart);
        this.goodMeasurement = true;
        this.recvBufStartOfMeasurement = this.recvBufStart;
    }

    static {
        $assertionsDisabled = !LineTcpConnectionContext.class.desiredAssertionStatus();
        LOG = LogFactory.getLog((Class<?>) LineTcpConnectionContext.class);
    }
}
