package org.apache.iotdb.confignode.manager.pipe.execution;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.DecoratingLock;
import org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.class */
public class PipeConfigNodeSubtask extends PipeSubtask {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigNodeSubtask.class);
    private final PipeExtractor extractor;
    private final PipeProcessor processor;
    private final PipeConnector connector;
    private final DecoratingLock callbackDecoratingLock;
    private ExecutorService subtaskCallbackListeningExecutor;
    private volatile boolean isSubmitted;

    public PipeConfigNodeSubtask(String str, long j, Map<String, String> map, Map<String, String> map2, Map<String, String> map3) throws Exception {
        super(str, j);
        this.callbackDecoratingLock = new DecoratingLock();
        this.isSubmitted = false;
        this.extractor = initExtractor(map);
        this.processor = initProcessor(map2);
        this.connector = initConnector(map3);
    }

    private PipeExtractor initExtractor(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        PipeExtractor reflectExtractor = PipeConfigNodeAgent.plugin().reflectExtractor(pipeParameters);
        reflectExtractor.validate(new PipeParameterValidator(pipeParameters));
        reflectExtractor.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(this.taskID, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId())));
        return reflectExtractor;
    }

    private PipeProcessor initProcessor(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        PipeProcessor reflectProcessor = PipeConfigNodeAgent.plugin().reflectProcessor(pipeParameters);
        reflectProcessor.validate(new PipeParameterValidator(pipeParameters));
        reflectProcessor.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(this.taskID, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId())));
        return reflectProcessor;
    }

    private PipeConnector initConnector(Map<String, String> map) throws Exception {
        PipeParameters pipeParameters = new PipeParameters(map);
        PipeConnector reflectConnector = PipeConfigNodeAgent.plugin().reflectConnector(pipeParameters);
        reflectConnector.validate(new PipeParameterValidator(pipeParameters));
        reflectConnector.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(this.taskID, this.creationTime, ConfigNodeInfo.CONFIG_REGION_ID.getId())));
        reflectConnector.handshake();
        return reflectConnector;
    }

    public void bindExecutors(ListeningExecutorService listeningExecutorService, ExecutorService executorService, PipeSubtaskScheduler pipeSubtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = listeningExecutorService;
        this.subtaskCallbackListeningExecutor = executorService;
        this.subtaskScheduler = pipeSubtaskScheduler;
    }

    protected boolean executeOnce() throws Exception {
        if (this.isClosed.get()) {
            return false;
        }
        Event supply = this.lastEvent != null ? this.lastEvent : this.extractor.supply();
        setLastEvent(supply);
        if (supply == null) {
            return false;
        }
        try {
            this.connector.transfer(supply);
            releaseLastEvent(true);
            return true;
        } catch (Exception e) {
            if (!this.isClosed.get()) {
                throw new PipeException("Error occurred during executing PipeConnector#transfer.", e);
            }
            LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.");
            releaseLastEvent(false);
            return true;
        } catch (PipeConnectionException e2) {
            if (!this.isClosed.get()) {
                throw e2;
            }
            LOGGER.info("PipeConnectionException in pipe transfer, ignored because pipe is dropped.");
            releaseLastEvent(false);
            return true;
        }
    }

    /* renamed from: call, reason: merged with bridge method [inline-methods] */
    public Boolean m85call() throws Exception {
        boolean booleanValue = super.call().booleanValue();
        this.callbackDecoratingLock.waitForDecorated();
        return Boolean.valueOf(booleanValue);
    }

    public synchronized void onSuccess(Boolean bool) {
        this.isSubmitted = false;
        super.onSuccess(bool);
    }

    public synchronized void onFailure(Throwable th) {
        this.isSubmitted = false;
        if (this.isClosed.get()) {
            LOGGER.info("onFailure in pipe config node subtask, ignored because pipe is dropped.", th);
            releaseLastEvent(false);
            return;
        }
        if (this.retryCount.get() == 0) {
            LOGGER.warn("Failed to execute subtask {}({}), because of {}. Will retry forever until success.", new Object[]{this.taskID, getClass().getSimpleName(), th.getMessage(), th});
        }
        this.retryCount.incrementAndGet();
        LOGGER.warn("Retry executing subtask {}({}), retry count {}", new Object[]{this.taskID, getClass().getSimpleName(), Integer.valueOf(this.retryCount.get())});
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            LOGGER.warn("Interrupted when retrying to execute subtask {}({})", this.taskID, getClass().getSimpleName());
            Thread.currentThread().interrupt();
        }
        submitSelf();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void submitSelf() {
        if (this.shouldStopSubmittingSelf.get() || this.isSubmitted) {
            return;
        }
        this.callbackDecoratingLock.markAsDecorating();
        try {
            Futures.addCallback(this.subtaskWorkerThreadPoolExecutor.submit(this), this, this.subtaskCallbackListeningExecutor);
            this.isSubmitted = true;
        } finally {
            this.callbackDecoratingLock.markAsDecorated();
        }
    }

    public void close() {
        this.isClosed.set(true);
        try {
            this.extractor.close();
        } catch (Exception e) {
            LOGGER.info("Error occurred during closing PipeExtractor.", e);
        }
        try {
            this.processor.close();
        } catch (Exception e2) {
            LOGGER.info("Error occurred during closing PipeProcessor.", e2);
        }
        try {
            this.connector.close();
        } catch (Exception e3) {
            LOGGER.info("Error occurred during closing PipeConnector.", e3);
        } finally {
            super.close();
        }
    }

    protected synchronized void releaseLastEvent(boolean z) {
        if (this.lastEvent != null) {
            this.lastEvent = null;
        }
    }
}
