package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.subscription.SubscriptionOperation;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.topic.AlterTopicProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.class */
public class CreateSubscriptionProcedure extends AbstractOperateSubscriptionAndPipeProcedure {
    private static final Logger LOGGER = LoggerFactory.getLogger(CreateSubscriptionProcedure.class);
    private TSubscribeReq subscribeReq;
    private AlterConsumerGroupProcedure alterConsumerGroupProcedure;
    private List<CreatePipeProcedureV2> createPipeProcedures = new ArrayList();
    private final List<AlterTopicProcedure> alterTopicProcedures = new ArrayList();

    public CreateSubscriptionProcedure() {
    }

    public CreateSubscriptionProcedure(TSubscribeReq tSubscribeReq) {
        this.subscribeReq = tSubscribeReq;
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected SubscriptionOperation getOperation() {
        return SubscriptionOperation.CREATE_SUBSCRIPTION;
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected boolean executeFromValidate(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException {
        LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");
        this.subscriptionInfo.get().validateBeforeSubscribe(this.subscribeReq);
        String consumerGroupId = this.subscribeReq.getConsumerGroupId();
        ConsumerGroupMeta deepCopyConsumerGroupMeta = this.subscriptionInfo.get().deepCopyConsumerGroupMeta(consumerGroupId);
        deepCopyConsumerGroupMeta.addSubscription(this.subscribeReq.getConsumerId(), this.subscribeReq.getTopicNames());
        this.alterConsumerGroupProcedure = new AlterConsumerGroupProcedure(deepCopyConsumerGroupMeta, this.subscriptionInfo);
        for (String str : this.subscribeReq.getTopicNames()) {
            String generateSubscriptionPipeName = PipeStaticMeta.generateSubscriptionPipeName(str, consumerGroupId);
            if (!this.subscriptionInfo.get().isTopicSubscribedByConsumerGroup(str, consumerGroupId) || !this.pipeTaskInfo.get().isPipeExisted(generateSubscriptionPipeName)) {
                TopicMeta deepCopyTopicMeta = this.subscriptionInfo.get().deepCopyTopicMeta(str);
                this.createPipeProcedures.add(new CreatePipeProcedureV2(new TCreatePipeReq().setPipeName(generateSubscriptionPipeName).setExtractorAttributes(deepCopyTopicMeta.generateExtractorAttributes()).setProcessorAttributes(deepCopyTopicMeta.generateProcessorAttributes()).setConnectorAttributes(deepCopyTopicMeta.generateConnectorAttributes(consumerGroupId)), this.pipeTaskInfo));
            }
        }
        this.alterConsumerGroupProcedure.executeFromValidate(configNodeProcedureEnv);
        for (CreatePipeProcedureV2 createPipeProcedureV2 : this.createPipeProcedures) {
            createPipeProcedureV2.executeFromValidateTask(configNodeProcedureEnv);
            createPipeProcedureV2.executeFromCalculateInfoForTask(configNodeProcedureEnv);
        }
        return true;
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException {
        TSStatus tSStatus;
        LOGGER.info("CreateSubscriptionProcedure: executeFromOperateOnConfigNodes");
        this.alterConsumerGroupProcedure.executeFromOperateOnConfigNodes(configNodeProcedureEnv);
        try {
            tSStatus = configNodeProcedureEnv.getConfigManager().getConsensusManager().write(new OperateMultiplePipesPlanV2((List) this.createPipeProcedures.stream().map((v0) -> {
                return v0.constructPlan();
            }).collect(Collectors.toList())));
        } catch (ConsensusException e) {
            LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
            tSStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            tSStatus.setMessage(e.getMessage());
        }
        if (tSStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && tSStatus.getSubStatusSize() > 0) {
            throw new SubscriptionException(String.format("Failed to create subscription with request %s on config nodes, because %s", this.subscribeReq, tSStatus));
        }
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException, IOException {
        LOGGER.info("CreateSubscriptionProcedure: executeFromOperateOnDataNodes");
        this.alterConsumerGroupProcedure.executeFromOperateOnDataNodes(configNodeProcedureEnv);
        List<String> list = (List) this.createPipeProcedures.stream().map((v0) -> {
            return v0.getPipeName();
        }).collect(Collectors.toList());
        String parsePushPipeMetaExceptionForPipe = AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(null, pushMultiPipeMetaToDataNodes(list, configNodeProcedureEnv));
        if (!parsePushPipeMetaExceptionForPipe.isEmpty()) {
            throw new SubscriptionException(String.format("Failed to create pipes %s when creating subscription with request %s, details: %s, metadata will be synchronized later.", list, this.subscribeReq, parsePushPipeMetaExceptionForPipe));
        }
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected void rollbackFromValidate(ConfigNodeProcedureEnv configNodeProcedureEnv) {
        LOGGER.info("CreateSubscriptionProcedure: rollbackFromValidate");
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException {
        TSStatus tSStatus;
        LOGGER.info("CreateSubscriptionProcedure: rollbackFromOperateOnConfigNodes");
        try {
            tSStatus = configNodeProcedureEnv.getConfigManager().getConsensusManager().write(new OperateMultiplePipesPlanV2((List) this.createPipeProcedures.stream().map(createPipeProcedureV2 -> {
                return new DropPipePlanV2(createPipeProcedureV2.getPipeName());
            }).collect(Collectors.toList())));
        } catch (ConsensusException e) {
            LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
            tSStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
            tSStatus.setMessage(e.getMessage());
        }
        if (tSStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new SubscriptionException(String.format("Failed to rollback creating subscription with request %s on config nodes, because %s", this.subscribeReq, tSStatus));
        }
        this.alterConsumerGroupProcedure.rollbackFromOperateOnConfigNodes(configNodeProcedureEnv);
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure
    protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv configNodeProcedureEnv) throws SubscriptionException, IOException {
        LOGGER.info("CreateSubscriptionProcedure: rollbackFromOperateOnDataNodes");
        String parsePushPipeMetaExceptionForPipe = AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(null, AbstractOperatePipeProcedureV2.pushPipeMetaToDataNodes(configNodeProcedureEnv, this.pipeTaskInfo));
        if (!parsePushPipeMetaExceptionForPipe.isEmpty()) {
            throw new SubscriptionException(String.format("Failed to rollback create pipes when creating subscription with request %s, because %s", this.subscribeReq, parsePushPipeMetaExceptionForPipe));
        }
        this.alterConsumerGroupProcedure.rollbackFromOperateOnDataNodes(configNodeProcedureEnv);
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void serialize(DataOutputStream dataOutputStream) throws IOException {
        dataOutputStream.writeShort(ProcedureType.CREATE_SUBSCRIPTION_PROCEDURE.getTypeCode());
        super.serialize(dataOutputStream);
        ReadWriteIOUtils.write(this.subscribeReq.getConsumerId(), dataOutputStream);
        ReadWriteIOUtils.write(this.subscribeReq.getConsumerGroupId(), dataOutputStream);
        int topicNamesSize = this.subscribeReq.getTopicNamesSize();
        ReadWriteIOUtils.write(topicNamesSize, dataOutputStream);
        if (topicNamesSize != 0) {
            Iterator it = this.subscribeReq.getTopicNames().iterator();
            while (it.hasNext()) {
                ReadWriteIOUtils.write((String) it.next(), dataOutputStream);
            }
        }
        if (this.alterConsumerGroupProcedure != null) {
            ReadWriteIOUtils.write(true, dataOutputStream);
            this.alterConsumerGroupProcedure.serialize(dataOutputStream);
        } else {
            ReadWriteIOUtils.write(false, dataOutputStream);
        }
        if (this.alterTopicProcedures != null) {
            ReadWriteIOUtils.write(true, dataOutputStream);
            ReadWriteIOUtils.write(this.alterTopicProcedures.size(), dataOutputStream);
            Iterator<AlterTopicProcedure> it2 = this.alterTopicProcedures.iterator();
            while (it2.hasNext()) {
                it2.next().serialize(dataOutputStream);
            }
        } else {
            ReadWriteIOUtils.write(false, dataOutputStream);
        }
        if (this.createPipeProcedures == null) {
            ReadWriteIOUtils.write(false, dataOutputStream);
            return;
        }
        ReadWriteIOUtils.write(true, dataOutputStream);
        ReadWriteIOUtils.write(this.createPipeProcedures.size(), dataOutputStream);
        Iterator<CreatePipeProcedureV2> it3 = this.createPipeProcedures.iterator();
        while (it3.hasNext()) {
            it3.next().serialize(dataOutputStream);
        }
    }

    @Override // org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure, org.apache.iotdb.confignode.procedure.Procedure
    public void deserialize(ByteBuffer byteBuffer) {
        super.deserialize(byteBuffer);
        this.subscribeReq = new TSubscribeReq().setConsumerId(ReadWriteIOUtils.readString(byteBuffer)).setConsumerGroupId(ReadWriteIOUtils.readString(byteBuffer)).setTopicNames(new HashSet());
        int readInt = ReadWriteIOUtils.readInt(byteBuffer);
        for (int i = 0; i < readInt; i++) {
            this.subscribeReq.getTopicNames().add(ReadWriteIOUtils.readString(byteBuffer));
        }
        if (ReadWriteIOUtils.readBool(byteBuffer)) {
            ReadWriteIOUtils.readShort(byteBuffer);
            this.alterConsumerGroupProcedure = new AlterConsumerGroupProcedure();
            this.alterConsumerGroupProcedure.deserialize(byteBuffer);
        }
        if (ReadWriteIOUtils.readBool(byteBuffer)) {
            int readInt2 = ReadWriteIOUtils.readInt(byteBuffer);
            for (int i2 = 0; i2 < readInt2; i2++) {
                ReadWriteIOUtils.readShort(byteBuffer);
                AlterTopicProcedure alterTopicProcedure = new AlterTopicProcedure();
                alterTopicProcedure.deserialize(byteBuffer);
                this.alterTopicProcedures.add(alterTopicProcedure);
            }
        }
        if (ReadWriteIOUtils.readBool(byteBuffer)) {
            int readInt3 = ReadWriteIOUtils.readInt(byteBuffer);
            for (int i3 = 0; i3 < readInt3; i3++) {
                if (ReadWriteIOUtils.readShort(byteBuffer) == ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode()) {
                    CreatePipeProcedureV2 createPipeProcedureV2 = new CreatePipeProcedureV2();
                    createPipeProcedureV2.deserialize(byteBuffer);
                    this.createPipeProcedures.add(createPipeProcedureV2);
                }
            }
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        CreateSubscriptionProcedure createSubscriptionProcedure = (CreateSubscriptionProcedure) obj;
        return Objects.equals(Long.valueOf(getProcId()), Long.valueOf(createSubscriptionProcedure.getProcId())) && Objects.equals(getCurrentState(), createSubscriptionProcedure.getCurrentState()) && getCycles() == createSubscriptionProcedure.getCycles() && Objects.equals(this.subscribeReq, createSubscriptionProcedure.subscribeReq) && Objects.equals(this.alterConsumerGroupProcedure, createSubscriptionProcedure.alterConsumerGroupProcedure) && Objects.equals(this.createPipeProcedures, createSubscriptionProcedure.createPipeProcedures);
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(getProcId()), getCurrentState(), Integer.valueOf(getCycles()), this.subscribeReq, this.alterConsumerGroupProcedure, this.createPipeProcedures);
    }

    @TestOnly
    public void setAlterConsumerGroupProcedure(AlterConsumerGroupProcedure alterConsumerGroupProcedure) {
        this.alterConsumerGroupProcedure = alterConsumerGroupProcedure;
    }

    @TestOnly
    public AlterConsumerGroupProcedure getAlterConsumerGroupProcedure() {
        return this.alterConsumerGroupProcedure;
    }

    @TestOnly
    public void setCreatePipeProcedures(List<CreatePipeProcedureV2> list) {
        this.createPipeProcedures = list;
    }

    @TestOnly
    public List<CreatePipeProcedureV2> getCreatePipeProcedures() {
        return this.createPipeProcedures;
    }
}
