package org.apache.rocketmq.streams.common.channel.source;

import com.alibaba.fastjson.JSONObject;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.BatchMessageOffset;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.RuntimeUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/source/AbstractBatchSource.class */
public abstract class AbstractBatchSource extends AbstractSource {
    private static final Long MAX_BATCH_SIZE = 10000000000L;
    private static final String BATCH_MESSAGE_QUEUE_ID = "1";
    protected transient ScheduledExecutorService scheduled;
    protected transient AtomicLong offsetGenerator;
    protected transient long lastCommitTime;
    private transient BatchMessageOffset progress;

    public AbstractBatchSource() {
        setBatchMessage(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource, org.apache.rocketmq.streams.common.configurable.AbstractConfigurable
    public boolean initConfigurable() {
        this.scheduled = new ScheduledThreadPoolExecutor(2);
        this.offsetGenerator = new AtomicLong(System.currentTimeMillis());
        System.currentTimeMillis();
        return super.initConfigurable();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean startSource() {
        final String queueId = getQueueId();
        this.scheduled.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource.1
            @Override // java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() - AbstractBatchSource.this.lastCommitTime > AbstractBatchSource.this.getCheckpointTime()) {
                    AbstractBatchSource.this.lastCommitTime = System.currentTimeMillis();
                    AbstractBatchSource.this.sendCheckpoint(queueId);
                }
            }
        }, 0L, getCheckpointTime(), TimeUnit.SECONDS);
        return true;
    }

    public AbstractContext doReceiveMessage(JSONObject jSONObject) {
        return doReceiveMessage(jSONObject, false);
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public JSONObject createJson(Object obj) {
        return (this.isJsonData.booleanValue() && JSONObject.class.isInstance(obj)) ? (JSONObject) obj : super.createJson(obj);
    }

    public AbstractContext doReceiveMessage(String str, boolean z) {
        return doReceiveMessage(str, z, getQueueId(), this.offsetGenerator.incrementAndGet() + "");
    }

    public AbstractContext doReceiveMessage(JSONObject jSONObject, boolean z) {
        return doReceiveMessage(jSONObject, z, getQueueId(), this.offsetGenerator.incrementAndGet() + "");
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean supportNewSplitFind() {
        return false;
    }

    public void setProgress(JSONObject jSONObject) {
        BatchMessageOffset batchMessageOffset = new BatchMessageOffset();
        if (jSONObject != null) {
            batchMessageOffset.setCurrentMessage(jSONObject.toJSONString());
        }
        batchMessageOffset.setOwnerType(getType());
        this.progress = batchMessageOffset;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource, org.apache.rocketmq.streams.common.topology.builder.IStageBuilder
    public void addConfigurables(PipelineBuilder pipelineBuilder) {
        super.addConfigurables(pipelineBuilder);
        if (this.progress != null) {
            pipelineBuilder.addConfigurables(this.progress);
        }
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public AbstractContext doReceiveMessage(JSONObject jSONObject, boolean z, String str, String str2) {
        return executeMessage(createMessage(jSONObject, str, str2, z));
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    protected boolean isNotDataSplit(String str) {
        return false;
    }

    public AbstractContext doReceiveMessage(List<JSONObject> list, boolean z) {
        if (list == null || list.size() == 0) {
            return null;
        }
        int i = 0;
        for (JSONObject jSONObject : list) {
            if (i == list.size() - 1) {
                doReceiveMessage(jSONObject, z);
            } else {
                doReceiveMessage(jSONObject, false);
            }
            i++;
        }
        return null;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean supportRemoveSplitFind() {
        return false;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean supportOffsetRest() {
        return false;
    }

    @Override // org.apache.rocketmq.streams.common.channel.source.AbstractSource
    public boolean isBatchMessage() {
        return true;
    }

    public String getQueueId() {
        return RuntimeUtil.getDipperInstanceId();
    }

    public Long createOffset() {
        return Long.valueOf(this.offsetGenerator.incrementAndGet());
    }
}
