package org.apache.iotdb.confignode.manager.pipe.agent.task;

import java.util.Map;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.sink.PipeConfigRegionConnectorMetrics;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.class */
public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigNodeSubtask.class);
    private final String pipeName;
    private final PipeTaskMeta pipeTaskMeta;
    private PipeExtractor extractor;
    private PipeProcessor processor;

    public PipeConfigNodeSubtask(String str, long j, Map<String, String> map, Map<String, String> map2, Map<String, String> map3, PipeTaskMeta pipeTaskMeta) throws Exception {
        super(str + "_" + j, j, (PipeConnector) null);
        this.pipeName = str;
        this.pipeTaskMeta = pipeTaskMeta;
        initExtractor(map);
        initProcessor(map2);
        initConnector(map3);
        PipeConfigRegionConnectorMetrics.getInstance().register(this);
        PipeEventCommitManager.getInstance().register(str, j, ConfigNodeInfo.CONFIG_REGION_ID.getId(), str + "_" + j);
    }

    private void initExtractor(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        this.extractor = PipeConfigNodeAgent.plugin().reflectExtractor(pipeParameters);
        try {
            this.extractor.validate(new PipeParameterValidator(pipeParameters));
            this.extractor.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment(this.pipeName, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId(), this.pipeTaskMeta)));
        } catch (Exception e) {
            try {
                this.extractor.close();
            } catch (Exception e2) {
                LOGGER.warn("Failed to close extractor after failed to initialize extractor. Ignore this exception.", e2);
            }
            throw e;
        }
    }

    private void initProcessor(Map<String, String> map) {
        PipeParameters pipeParameters = new PipeParameters(map);
        this.processor = PipeConfigNodeAgent.plugin().getConfiguredProcessor(pipeParameters.getStringOrDefault("processor", BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()), pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(this.pipeName, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId())));
    }

    private void initConnector(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        this.outputPipeConnector = PipeConfigNodeAgent.plugin().reflectConnector(pipeParameters);
        try {
            this.outputPipeConnector.validate(new PipeParameterValidator(pipeParameters));
            this.outputPipeConnector.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(this.pipeName, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId())));
            this.outputPipeConnector.handshake();
        } catch (Exception e) {
            try {
                this.outputPipeConnector.close();
            } catch (Exception e2) {
                LOGGER.warn("Failed to close connector after failed to initialize connector. Ignore this exception.", e2);
            }
            throw e;
        }
    }

    protected boolean executeOnce() throws Exception {
        if (this.isClosed.get()) {
            return false;
        }
        Event supply = this.lastEvent != null ? this.lastEvent : this.extractor.supply();
        setLastEvent(supply);
        if (supply == null) {
            return false;
        }
        try {
            if (!(supply instanceof ProgressReportEvent)) {
                this.outputPipeConnector.transfer(supply);
                PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(this.taskID);
            }
            decreaseReferenceCountAndReleaseLastEvent(supply, true);
            return true;
        } catch (Exception e) {
            setLastExceptionEvent(supply);
            if (!this.isClosed.get()) {
                throw new PipeException(String.format("Exception in pipe transfer, subtask: %s, last event: %s", this.taskID, this.lastEvent), e);
            }
            LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.", e);
            clearReferenceCountAndReleaseLastEvent(supply);
            return true;
        } catch (PipeException e2) {
            setLastExceptionEvent(supply);
            if (!this.isClosed.get()) {
                throw e2;
            }
            LOGGER.info("{} in pipe transfer, ignored because pipe is dropped.", e2.getClass().getSimpleName(), e2);
            clearReferenceCountAndReleaseLastEvent(supply);
            return true;
        }
    }

    public void close() {
        this.isClosed.set(true);
        PipeEventCommitManager.getInstance().deregister(this.pipeName, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId());
        PipeConfigRegionConnectorMetrics.getInstance().deregister(this.taskID);
        try {
            this.extractor.close();
        } catch (Exception e) {
            LOGGER.info("Error occurred during closing PipeExtractor.", e);
        }
        try {
            this.processor.close();
        } catch (Exception e2) {
            LOGGER.info("Error occurred during closing PipeProcessor.", e2);
        }
        try {
            this.outputPipeConnector.close();
        } catch (Exception e3) {
            LOGGER.info("Error occurred during closing PipeConnector.", e3);
        } finally {
            super.close();
        }
    }

    protected String getRootCause(Throwable th) {
        if (th != null) {
            return th.getMessage();
        }
        return null;
    }

    protected void report(EnrichedEvent enrichedEvent, PipeRuntimeException pipeRuntimeException) {
        PipeConfigNodeAgent.runtime().report(enrichedEvent, pipeRuntimeException);
    }

    public String getPipeName() {
        return this.pipeName;
    }
}
