package org.apache.shardingsphere.data.pipeline.core.importer;

import java.util.List;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.class */
public final class SingleChannelConsumerImporter extends AbstractPipelineLifecycleRunnable implements Importer {
    private final PipelineChannel channel;
    private final int batchSize;
    private final long timeoutMillis;
    private final PipelineSink sink;
    private final PipelineJobProgressListener jobProgressListener;

    @Override // org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable
    protected void runBlocking() {
        while (isRunning()) {
            List<Record> fetch = this.channel.fetch(this.batchSize, this.timeoutMillis);
            if (!fetch.isEmpty()) {
                PipelineJobUpdateProgress write = this.sink.write("", fetch);
                this.channel.ack(fetch);
                this.jobProgressListener.onProgressUpdated(write);
                if (FinishedRecord.class.equals(fetch.get(fetch.size() - 1).getClass())) {
                    return;
                }
            }
        }
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable
    protected void doStop() {
        QuietlyCloser.close(this.sink);
    }

    @Generated
    public SingleChannelConsumerImporter(PipelineChannel pipelineChannel, int i, long j, PipelineSink pipelineSink, PipelineJobProgressListener pipelineJobProgressListener) {
        this.channel = pipelineChannel;
        this.batchSize = i;
        this.timeoutMillis = j;
        this.sink = pipelineSink;
        this.jobProgressListener = pipelineJobProgressListener;
    }
}
