package io.datarouter.conveyor.queue;

import io.datarouter.conveyor.BaseConveyor;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.storage.queue.QueueMessage;
import io.datarouter.storage.setting.Setting;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/BaseBatchedLossyQueueConsumerConveyor.class */
public abstract class BaseBatchedLossyQueueConsumerConveyor<PK extends PrimaryKey<PK>, D extends Databean<PK, D>> extends BaseConveyor {
    private static final Logger logger = LoggerFactory.getLogger(BaseBatchedLossyQueueConsumerConveyor.class);
    private static final Duration PEEK_TIMEOUT = Duration.ofSeconds(5);
    private static final int BATCH_SIZE = 100;
    private final QueueConsumer<PK, D> queueConsumer;
    private final Object lock;
    private List<D> buffer;

    public BaseBatchedLossyQueueConsumerConveyor(String str, Setting<Boolean> setting, QueueConsumer<PK, D> queueConsumer) {
        super(str, setting, () -> {
            return false;
        });
        this.lock = new Object();
        this.queueConsumer = queueConsumer;
        this.buffer = new ArrayList(BATCH_SIZE);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17 */
    /* JADX WARN: Type inference failed for: r0v30, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v31, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v34 */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Throwable] */
    @Override // io.datarouter.conveyor.BaseConveyor
    public BaseConveyor.ProcessBatchResult processBatch() {
        List<D> emptyList = Collections.emptyList();
        QueueMessage<PK, D> peek = this.queueConsumer.peek(PEEK_TIMEOUT);
        if (peek == null) {
            logger.info("peeked conveyor={} nullMessage", this.name);
            ?? r0 = this.lock;
            synchronized (r0) {
                List<D> copyAndClearBuffer = copyAndClearBuffer();
                r0 = r0;
                flushBuffer(copyAndClearBuffer);
                return new BaseConveyor.ProcessBatchResult(false);
            }
        }
        ?? r02 = this.lock;
        synchronized (r02) {
            Databean databean = peek.getDatabean();
            logger.info("peeked conveyor={} messageCount={}", this.name, 1);
            this.buffer.add(databean);
            if (this.buffer.size() >= BATCH_SIZE) {
                emptyList = copyAndClearBuffer();
            }
            r02 = r02;
            logger.info("consumed conveyor={} messageCount={}", this.name, 1);
            ConveyorCounters.incConsumedOpAndDatabeans(this, 1L);
            this.queueConsumer.ack(peek.getKey());
            logger.info("acked conveyor={} messageCount={}", this.name, 1);
            ConveyorCounters.incAck(this);
            flushBuffer(emptyList);
            return new BaseConveyor.ProcessBatchResult(true);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    @Override // io.datarouter.conveyor.BaseConveyor
    public void interrupted() {
        ?? r0 = this.lock;
        synchronized (r0) {
            List<D> copyAndClearBuffer = copyAndClearBuffer();
            r0 = r0;
            flushBuffer(copyAndClearBuffer);
        }
    }

    private void flushBuffer(List<D> list) {
        if (list.isEmpty()) {
            return;
        }
        processBuffer(list);
        ConveyorCounters.incFlushBuffer(this, list.size());
    }

    private List<D> copyAndClearBuffer() {
        List<D> list = (List) this.buffer.stream().collect(Collectors.toList());
        this.buffer.clear();
        return list;
    }

    protected abstract void processBuffer(List<D> list);
}
