package org.apache.iotdb.db.pipe.agent.task.stage;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.stage.PipeTaskStage;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.agent.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.class */
public class PipeTaskProcessorStage extends PipeTaskStage {
    private final PipeProcessorSubtaskExecutor executor;
    private final PipeProcessorSubtask pipeProcessorSubtask;

    public PipeTaskProcessorStage(String str, long j, PipeParameters pipeParameters, int i, EventSupplier eventSupplier, UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue, PipeProcessorSubtaskExecutor pipeProcessorSubtaskExecutor, PipeTaskMeta pipeTaskMeta, boolean z) {
        PipeProcessorRuntimeConfiguration pipeTaskRuntimeConfiguration = new PipeTaskRuntimeConfiguration(new PipeTaskProcessorRuntimeEnvironment(str, j, i, pipeTaskMeta));
        this.pipeProcessorSubtask = new PipeProcessorSubtask(str + "_" + i + "_" + j, str, j, i, eventSupplier, StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(i)) ? PipeDataNodeAgent.plugin().dataRegion().getConfiguredProcessor(pipeParameters.getStringOrDefault("processor", BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()), pipeParameters, pipeTaskRuntimeConfiguration) : PipeDataNodeAgent.plugin().schemaRegion().getConfiguredProcessor(pipeParameters.getStringOrDefault("processor", BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()), pipeParameters, pipeTaskRuntimeConfiguration), new PipeEventCollector(unboundedBlockingPendingQueue, j, i, z));
        this.executor = pipeProcessorSubtaskExecutor;
    }

    public void createSubtask() throws PipeException {
        this.executor.register(this.pipeProcessorSubtask);
    }

    public void startSubtask() throws PipeException {
        this.executor.start(this.pipeProcessorSubtask.getTaskID());
    }

    public void stopSubtask() throws PipeException {
        this.executor.stop(this.pipeProcessorSubtask.getTaskID());
    }

    public void dropSubtask() throws PipeException {
        this.executor.deregister(this.pipeProcessorSubtask.getTaskID());
    }
}
