package org.apache.iotdb.db.pipe.connector.protocol.legacy;

import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.thrift.TException;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.class */
public class IoTDBLegacyPipeConnector implements PipeConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLegacyPipeConnector.class);
    private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
    private String ipAddress;
    private int port;
    private boolean useSSL;
    private String trustStore;
    private String trustStorePwd;
    private String user;
    private String password;
    private String syncConnectorVersion;
    private String pipeName;
    private IoTDBSyncClient client;
    private SessionPool sessionPool;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        PipeParameters parameters = pipeParameterValidator.getParameters();
        IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
        Set<TEndPoint> parseNodeUrls = parseNodeUrls(pipeParameterValidator.getParameters());
        pipeParameterValidator.validate(objArr -> {
            return (((Boolean) objArr[0]).booleanValue() && ((Boolean) objArr[1]).booleanValue()) || (((Boolean) objArr[2]).booleanValue() && ((Boolean) objArr[3]).booleanValue());
        }, String.format("Either %s:%s or %s:%s must be specified", "connector.ip", "connector.port", "sink.ip", "sink.port"), new Object[]{Boolean.valueOf(parameters.hasAttribute("connector.ip")), Boolean.valueOf(parameters.hasAttribute("connector.port")), Boolean.valueOf(parameters.hasAttribute("sink.ip")), Boolean.valueOf(parameters.hasAttribute("sink.port"))}).validate(objArr2 -> {
            try {
                return !NodeUrlUtils.containsLocalAddress((List) parseNodeUrls.stream().filter(tEndPoint -> {
                    return tEndPoint.getPort() == config.getRpcPort();
                }).map((v0) -> {
                    return v0.getIp();
                }).collect(Collectors.toList()));
            } catch (UnknownHostException e) {
                LOGGER.warn("Unknown host when checking pipe sink IP.", e);
                return false;
            }
        }, String.format("One of the endpoints %s of the receivers is pointing back to the legacy receiver %s on sender itself, or unknown host when checking pipe sink IP.", parseNodeUrls, new TEndPoint(config.getRpcAddress(), config.getRpcPort())), new Object[0]).validate(objArr3 -> {
            return !((Boolean) objArr3[0]).booleanValue() || (((Boolean) objArr3[1]).booleanValue() && ((Boolean) objArr3[2]).booleanValue());
        }, String.format("When %s is specified to true, %s and %s must be specified", "sink.ssl.enable", "sink.ssl.trust-store-path", "sink.ssl.trust-store-pwd"), new Object[]{Boolean.valueOf(parameters.getBooleanOrDefault("sink.ssl.enable", false)), Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-path")), Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-pwd"))});
    }

    private Set<TEndPoint> parseNodeUrls(PipeParameters pipeParameters) {
        HashSet hashSet = new HashSet();
        if (pipeParameters.hasAttribute("connector.ip") && pipeParameters.hasAttribute("connector.port")) {
            hashSet.add(new TEndPoint(pipeParameters.getStringByKeys(new String[]{"connector.ip"}), pipeParameters.getIntByKeys(new String[]{"connector.port"}).intValue()));
        }
        if (pipeParameters.hasAttribute("sink.ip") && pipeParameters.hasAttribute("sink.port")) {
            hashSet.add(new TEndPoint(pipeParameters.getStringByKeys(new String[]{"sink.ip"}), pipeParameters.getIntByKeys(new String[]{"sink.port"}).intValue()));
        }
        return hashSet;
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        this.ipAddress = pipeParameters.getStringByKeys(new String[]{"connector.ip", "sink.ip"});
        this.port = pipeParameters.getIntByKeys(new String[]{"connector.port", "sink.port"}).intValue();
        this.user = pipeParameters.getStringOrDefault(Arrays.asList("connector.user", "sink.user", "connector.username", "sink.username"), SqlConstant.ROOT);
        this.password = pipeParameters.getStringOrDefault(Arrays.asList("connector.password", "sink.password"), SqlConstant.ROOT);
        this.syncConnectorVersion = pipeParameters.getStringOrDefault(Arrays.asList("connector.version", "sink.version"), "1.1");
        this.pipeName = pipeConnectorRuntimeConfiguration.getRuntimeEnvironment().getPipeName();
        this.useSSL = pipeParameters.getBooleanOrDefault("sink.ssl.enable", false);
        this.trustStore = pipeParameters.getString("sink.ssl.trust-store-path");
        this.trustStorePwd = pipeParameters.getString("sink.ssl.trust-store-pwd");
    }

    public void handshake() throws Exception {
        close();
        try {
            this.client = new IoTDBSyncClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs(COMMON_CONFIG.getDnConnectionTimeoutInMS()).setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled()).build(), this.ipAddress, this.port, this.useSSL, this.trustStore, this.trustStorePwd);
            TSStatus handshake = this.client.handshake(new TSyncIdentityInfo(this.pipeName, System.currentTimeMillis(), this.syncConnectorVersion, SqlConstant.ROOT));
            if (handshake.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                this.sessionPool = new SessionPool.Builder().host(this.ipAddress).port(this.port).user(this.user).password(this.password).maxSize(1).useSSL(this.useSSL).trustStore(this.trustStore).trustStorePwd(this.trustStorePwd).build();
            } else {
                String format = String.format("The receiver %s:%s rejected the pipe task because %s", this.ipAddress, Integer.valueOf(this.port), handshake.message);
                LOGGER.warn(format);
                throw new PipeRuntimeCriticalException(format);
            }
        } catch (TException e) {
            throw new PipeConnectionException(String.format("Error occurred while connecting to receiver %s:%s, please check network connectivity or SSL configurations when enable SSL transmission", this.ipAddress, Integer.valueOf(this.port)), e);
        }
    }

    public void heartbeat() throws Exception {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
        } else {
            if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
                throw new NotImplementedException("IoTDBLegacyPipeConnector only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent.");
            }
            doTransferWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            throw new NotImplementedException("IoTDBLegacyPipeConnector only support PipeTsFileInsertionEvent.");
        }
        if (!((PipeTsFileInsertionEvent) tsFileInsertionEvent).waitForTsFileClose()) {
            LOGGER.warn("Pipe skipping temporary TsFile which shouldn't be transferred: {}", ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile());
            return;
        }
        try {
            doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
        } catch (TException e) {
            throw new PipeConnectionException(String.format("Network error when transfer tsFile insertion event: %s.", ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage()), e);
        }
    }

    public void transfer(Event event) throws Exception {
        if ((event instanceof PipeHeartbeatEvent) || (event instanceof PipeTerminateEvent)) {
            return;
        }
        LOGGER.warn("IoTDBLegacyPipeConnector does not support transferring generic event: {}.", event);
    }

    private void doTransferWrapper(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws IoTDBConnectionException, StatementExecutionException {
        if (pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(IoTDBLegacyPipeConnector.class.getName())) {
            try {
                doTransfer(pipeInsertNodeTabletInsertionEvent);
            } finally {
                pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(IoTDBLegacyPipeConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws IoTDBConnectionException, StatementExecutionException {
        List<Tablet> convertToTablets = pipeInsertNodeTabletInsertionEvent.convertToTablets();
        for (int i = 0; i < convertToTablets.size(); i++) {
            Tablet tablet = convertToTablets.get(i);
            if (!Objects.isNull(tablet) && tablet.rowSize != 0) {
                if (pipeInsertNodeTabletInsertionEvent.isAligned(i)) {
                    this.sessionPool.insertAlignedTablet(tablet);
                } else {
                    this.sessionPool.insertTablet(tablet);
                }
            }
        }
    }

    private void doTransferWrapper(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IoTDBConnectionException, StatementExecutionException {
        if (pipeRawTabletInsertionEvent.increaseReferenceCount(IoTDBLegacyPipeConnector.class.getName())) {
            try {
                doTransfer(pipeRawTabletInsertionEvent);
            } finally {
                pipeRawTabletInsertionEvent.decreaseReferenceCount(IoTDBLegacyPipeConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IoTDBConnectionException, StatementExecutionException {
        Tablet convertToTablet = pipeRawTabletInsertionEvent.convertToTablet();
        if (pipeRawTabletInsertionEvent.isAligned()) {
            this.sessionPool.insertAlignedTablet(convertToTablet);
        } else {
            this.sessionPool.insertTablet(convertToTablet);
        }
    }

    private void doTransferWrapper(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, TException, IOException {
        if (pipeTsFileInsertionEvent.increaseReferenceCount(IoTDBLegacyPipeConnector.class.getName())) {
            try {
                doTransfer(pipeTsFileInsertionEvent);
            } finally {
                pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBLegacyPipeConnector.class.getName(), false);
            }
        }
    }

    private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, TException, IOException {
        File tsFile = pipeTsFileInsertionEvent.getTsFile();
        transportSingleFilePieceByPiece(tsFile);
        this.client.sendPipeData(ByteBuffer.wrap(new TsFilePipeData(SubStringFunctionColumnTransformer.EMPTY_STRING, tsFile.getName(), -1L).serialize()));
    }

    /* JADX WARN: Code restructure failed: missing block: B:19:0x00a5, code lost:
    
        r0 = java.lang.String.format("Network failed to receive tsFile %s, status: %s", r9, r0);
        org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector.LOGGER.warn(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x00cd, code lost:
    
        throw new org.apache.iotdb.pipe.api.exception.PipeConnectionException(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void transportSingleFilePieceByPiece(java.io.File r9) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 290
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector.transportSingleFilePieceByPiece(java.io.File):void");
    }

    public void close() throws Exception {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        if (this.sessionPool != null) {
            this.sessionPool.close();
            this.sessionPool = null;
        }
    }
}
