package org.apache.iotdb.db.pipe.agent.task.subtask.connector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.class */
public class PipeConnectorSubtaskManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtaskManager.class);
    private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE = "Failed to deregister PipeConnectorSubtask. No such subtask: ";
    private final Map<String, List<PipeConnectorSubtaskLifeCycle>> attributeSortedString2SubtaskLifeCycleMap;

    /* loaded from: input_file:org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager$PipeSubtaskManagerHolder.class */
    private static class PipeSubtaskManagerHolder {
        private static final PipeConnectorSubtaskManager INSTANCE = new PipeConnectorSubtaskManager();

        private PipeSubtaskManagerHolder() {
        }
    }

    public synchronized String register(PipeConnectorSubtaskExecutor pipeConnectorSubtaskExecutor, PipeParameters pipeParameters, PipeTaskConnectorRuntimeEnvironment pipeTaskConnectorRuntimeEnvironment) {
        int i;
        String str;
        PipeEventCommitManager.getInstance().register(pipeTaskConnectorRuntimeEnvironment.getPipeName(), pipeTaskConnectorRuntimeEnvironment.getCreationTime(), pipeTaskConnectorRuntimeEnvironment.getRegionId(), pipeParameters.getStringOrDefault(Arrays.asList("connector", "sink"), BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()).toLowerCase());
        boolean contains = StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(pipeTaskConnectorRuntimeEnvironment.getRegionId()));
        boolean z = false;
        String generateAttributeSortedString = generateAttributeSortedString(pipeParameters);
        if (contains) {
            i = pipeParameters.getIntOrDefault(Arrays.asList("connector.parallel.tasks", "sink.parallel.tasks"), PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
            z = pipeParameters.getBooleanOrDefault(Arrays.asList("connector.realtime-first", "sink.realtime-first"), true);
            str = "data_" + generateAttributeSortedString;
        } else {
            i = 1;
            str = "schema_" + generateAttributeSortedString;
        }
        if (!this.attributeSortedString2SubtaskLifeCycleMap.containsKey(str)) {
            ArrayList arrayList = new ArrayList(i);
            UnboundedBlockingPendingQueue pipeRealtimePriorityBlockingQueue = z ? new PipeRealtimePriorityBlockingQueue() : new UnboundedBlockingPendingQueue(new PipeDataRegionEventCounter());
            for (int i2 = 0; i2 < i; i2++) {
                PipeConnector reflectConnector = contains ? PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeParameters) : PipeDataNodeAgent.plugin().schemaRegion().reflectConnector(pipeParameters);
                try {
                    reflectConnector.validate(new PipeParameterValidator(pipeParameters));
                    reflectConnector.customize(pipeParameters, new PipeTaskRuntimeConfiguration(pipeTaskConnectorRuntimeEnvironment));
                    reflectConnector.handshake();
                    arrayList.add(new PipeConnectorSubtaskLifeCycle(pipeConnectorSubtaskExecutor, new PipeConnectorSubtask(String.format("%s_%s_%s", str, Long.valueOf(pipeTaskConnectorRuntimeEnvironment.getCreationTime()), Integer.valueOf(i2)), pipeTaskConnectorRuntimeEnvironment.getCreationTime(), str, i2, pipeRealtimePriorityBlockingQueue, reflectConnector), pipeRealtimePriorityBlockingQueue));
                } catch (Exception e) {
                    try {
                        reflectConnector.close();
                    } catch (Exception e2) {
                        LOGGER.warn("Failed to close connector after failed to initialize connector. Ignore this exception.", e2);
                    }
                    throw new PipeException("Failed to construct PipeConnector, because of " + e.getMessage(), e);
                }
            }
            this.attributeSortedString2SubtaskLifeCycleMap.put(str, arrayList);
        }
        Iterator<PipeConnectorSubtaskLifeCycle> it = this.attributeSortedString2SubtaskLifeCycleMap.get(str).iterator();
        while (it.hasNext()) {
            it.next().register();
        }
        return str;
    }

    public synchronized void deregister(String str, long j, int i, String str2) {
        if (!this.attributeSortedString2SubtaskLifeCycleMap.containsKey(str2)) {
            throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + str2);
        }
        List<PipeConnectorSubtaskLifeCycle> list = this.attributeSortedString2SubtaskLifeCycleMap.get(str2);
        list.removeIf(pipeConnectorSubtaskLifeCycle -> {
            return pipeConnectorSubtaskLifeCycle.deregister(str, i);
        });
        if (list.isEmpty()) {
            this.attributeSortedString2SubtaskLifeCycleMap.remove(str2);
        }
        PipeEventCommitManager.getInstance().deregister(str, j, i);
    }

    public synchronized void start(String str) {
        if (!this.attributeSortedString2SubtaskLifeCycleMap.containsKey(str)) {
            throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + str);
        }
        Iterator<PipeConnectorSubtaskLifeCycle> it = this.attributeSortedString2SubtaskLifeCycleMap.get(str).iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    public synchronized void stop(String str) {
        if (!this.attributeSortedString2SubtaskLifeCycleMap.containsKey(str)) {
            throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + str);
        }
        Iterator<PipeConnectorSubtaskLifeCycle> it = this.attributeSortedString2SubtaskLifeCycleMap.get(str).iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(String str) {
        if (this.attributeSortedString2SubtaskLifeCycleMap.containsKey(str)) {
            return this.attributeSortedString2SubtaskLifeCycleMap.get(str).get(0).getPendingQueue();
        }
        throw new PipeException("Failed to get PendingQueue. No such subtask: " + str);
    }

    private String generateAttributeSortedString(PipeParameters pipeParameters) {
        TreeMap treeMap = new TreeMap(pipeParameters.getAttribute());
        treeMap.remove("__system.restart");
        return treeMap.toString();
    }

    private PipeConnectorSubtaskManager() {
        this.attributeSortedString2SubtaskLifeCycleMap = new HashMap();
    }

    public static PipeConnectorSubtaskManager instance() {
        return PipeSubtaskManagerHolder.INSTANCE;
    }
}
