package io.datarouter.conveyor.trace.conveyor;

import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.instrumentation.exception.DatarouterExceptionPublisher;
import io.datarouter.instrumentation.exception.HttpRequestRecordBatchDto;
import io.datarouter.instrumentation.trace.TraceBatchedBundleDto;
import io.datarouter.instrumentation.trace.TraceBundleAndHttpRequestRecordDto;
import io.datarouter.instrumentation.trace.TracePublisher;
import io.datarouter.instrumentation.trace.TracerTool;
import io.datarouter.scanner.Scanner;
import io.datarouter.trace.conveyor.TraceBuffers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/datarouter/conveyor/trace/conveyor/TraceMemoryToPublisherConveyorConfiguration.class */
public class TraceMemoryToPublisherConveyorConfiguration implements ConveyorConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(TraceMemoryToPublisherConveyorConfiguration.class);
    private static final int BATCH_SIZE = 500;

    @Inject
    private TraceBuffers traceBuffers;

    @Inject
    private TracePublisher tracePublisher;

    @Inject
    private DatarouterExceptionPublisher exceptionPublisher;

    @Inject
    private ConveyorGauges gaugeRecorder;

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        Instant now = Instant.now();
        List<TraceBundleAndHttpRequestRecordDto> pollMultiWithLimit = this.traceBuffers.buffer.pollMultiWithLimit(BATCH_SIZE);
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        TracerTool.setAlternativeStartTime();
        if (pollMultiWithLimit.isEmpty()) {
            return new Conveyor.ProcessResult(false);
        }
        try {
            processTraceEntityDtos(pollMultiWithLimit);
            ConveyorCounters.incPutMultiOpAndDatabeans(conveyorRunnable, pollMultiWithLimit.size());
            return new Conveyor.ProcessResult(pollMultiWithLimit.size() == BATCH_SIZE);
        } catch (RuntimeException e) {
            logger.warn("exception sending trace to sqs ids={}", Scanner.of(pollMultiWithLimit).map((v0) -> {
                return v0.getTraceparent();
            }).list(), e);
            ConveyorCounters.inc(conveyorRunnable, "putMulti exception", 1L);
            return new Conveyor.ProcessResult(true);
        }
    }

    private void processTraceEntityDtos(List<TraceBundleAndHttpRequestRecordDto> list) {
        Scanner.of(list).map(traceBundleAndHttpRequestRecordDto -> {
            return traceBundleAndHttpRequestRecordDto.traceBundleDto;
        }).flush(list2 -> {
            this.tracePublisher.addBatch(new TraceBatchedBundleDto(list2));
        });
        Scanner.of(list).map(traceBundleAndHttpRequestRecordDto2 -> {
            return traceBundleAndHttpRequestRecordDto2.httpRequestRecord;
        }).include((v0) -> {
            return Objects.nonNull(v0);
        }).flush(list3 -> {
            this.exceptionPublisher.addHttpRequestRecord(new HttpRequestRecordBatchDto(list3));
        });
    }

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public boolean shouldRunOnShutdown() {
        return true;
    }

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Duration delay() {
        return Duration.ofSeconds(5L);
    }
}
