package io.datarouter.conveyor.queue;

import io.datarouter.conveyor.BaseConveyor;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGaugeRecorder;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.queue.BlobQueueMessage;
import io.datarouter.web.exception.ExceptionRecorder;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/BaseBlobQueueConsumerConveyor.class */
public abstract class BaseBlobQueueConsumerConveyor<T> extends BaseConveyor {
    private static final Logger logger = LoggerFactory.getLogger(BaseBlobQueueConsumerConveyor.class);
    private static final Duration PEEK_TIMEOUT = Duration.ofSeconds(30);
    private static final Duration VISIBILITY_TIMEOUT = Duration.ofSeconds(30);
    private final BlobQueueConsumer<T> queueConsumer;

    protected BaseBlobQueueConsumerConveyor(String str, Supplier<Boolean> supplier, ExceptionRecorder exceptionRecorder, BlobQueueConsumer<T> blobQueueConsumer, ConveyorGaugeRecorder conveyorGaugeRecorder) {
        super(str, supplier, () -> {
            return false;
        }, exceptionRecorder, conveyorGaugeRecorder);
        this.queueConsumer = blobQueueConsumer;
    }

    @Override // io.datarouter.conveyor.BaseConveyor
    public BaseConveyor.ProcessBatchResult processBatch() {
        Duration visibilityTimeout = getVisibilityTimeout();
        Instant now = Instant.now();
        Optional<BlobQueueMessage<T>> peek = this.queueConsumer.peek(PEEK_TIMEOUT, visibilityTimeout);
        this.gaugeRecorder.savePeekDurationMs(this, Duration.between(now, Instant.now()).toMillis());
        if (peek.isEmpty()) {
            logger.info("peeked conveyor={} nullMessage", this.name);
            return new BaseConveyor.ProcessBatchResult(false);
        }
        BlobQueueMessage<T> blobQueueMessage = peek.get();
        logger.info("peeked conveyor={} messageCount={}", this.name, 1);
        Instant now2 = Instant.now();
        try {
            if (!processOneShouldAck(blobQueueMessage.scanSplitDecodedData())) {
                return new BaseConveyor.ProcessBatchResult(true);
            }
            Instant now3 = Instant.now();
            this.gaugeRecorder.saveProcessBufferDurationMs(this, Duration.between(now2, now3).toMillis());
            if (Duration.between(now2, now3).toMillis() > visibilityTimeout.toMillis()) {
                logger.warn("slow conveyor conveyor={} durationMs={}", this.name, Long.valueOf(Duration.between(now2, now3).toMillis()));
            }
            logger.info("consumed conveyor={} messageCount={}", this.name, 1);
            ConveyorCounters.incConsumedOpAndDatabeans(this, 1L);
            Instant now4 = Instant.now();
            this.queueConsumer.ack(blobQueueMessage);
            this.gaugeRecorder.saveAckDurationMs(this, Duration.between(now4, Instant.now()).toMillis());
            logger.info("acked conveyor={} messageCount={}", this.name, 1);
            ConveyorCounters.incAck(this);
            return new BaseConveyor.ProcessBatchResult(true);
        } catch (Exception e) {
            throw new RuntimeException("failed to process message", e);
        }
    }

    protected Duration getVisibilityTimeout() {
        return VISIBILITY_TIMEOUT;
    }

    protected boolean processOneShouldAck(Scanner<T> scanner) {
        processOne(scanner);
        return true;
    }

    protected abstract void processOne(Scanner<T> scanner);
}
