package org.apache.iotdb.tool.tsfile;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.transport.TTransportException;

/* loaded from: input_file:org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.class */
public class ImportTsFileRemotely extends ImportTsFileBase {
    private static final String MODS = ".mods";
    private static final String LOAD_STRATEGY = "sync";
    private IoTDBSyncClient client;
    private static String host;
    private static String port;
    private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
    private static final Integer MAX_RETRY_COUNT = 3;
    private static final AtomicInteger CONNECTION_TIMEOUT_MS = new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
    private static String username = "root";
    private static String password = "root";

    public ImportTsFileRemotely(String str) {
        setTimePrecision(str);
        initClient();
        sendHandshake();
    }

    @Override // org.apache.iotdb.tool.tsfile.ImportTsFileBase
    public void loadTsFile() {
        while (true) {
            try {
                try {
                    String pollFromQueue = ImportTsFileScanTool.pollFromQueue();
                    if (pollFromQueue == null) {
                        return;
                    }
                    File file = new File(pollFromQueue);
                    try {
                        if (ImportTsFileScanTool.isContainModsFile(pollFromQueue + MODS)) {
                            doTransfer(file, new File(pollFromQueue + MODS));
                        } else {
                            doTransfer(file, null);
                        }
                        processSuccessFile(pollFromQueue);
                    } catch (Exception e) {
                        IOT_PRINTER.println("Connect is abort, try to reconnect, max retry count: " + MAX_RETRY_COUNT);
                        boolean z = false;
                        int i = 1;
                        while (true) {
                            if (i > MAX_RETRY_COUNT.intValue()) {
                                break;
                            }
                            try {
                                IOT_PRINTER.println(String.format("The %sth retry will after %s seconds.", Integer.valueOf(i), Integer.valueOf(i * 2)));
                                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(i * 2));
                                close();
                                initClient();
                                sendHandshake();
                                if (ImportTsFileScanTool.isContainModsFile(pollFromQueue + MODS)) {
                                    doTransfer(file, new File(pollFromQueue + MODS));
                                } else {
                                    doTransfer(file, null);
                                }
                                processSuccessFile(pollFromQueue);
                                z = true;
                                IOT_PRINTER.println("Reconnect successful.");
                            } catch (Exception e2) {
                                IOT_PRINTER.println(String.format("The %sth reconnect failed", Integer.valueOf(i)));
                                i++;
                            }
                        }
                        if (!z) {
                            processFailFile(pollFromQueue, e);
                            close();
                            initClient();
                            sendHandshake();
                        }
                    }
                } catch (Exception e3) {
                    IOT_PRINTER.println("Unexpected error occurred: " + e3.getMessage());
                    close();
                    return;
                }
            } finally {
                close();
            }
        }
    }

    public void sendHandshake() {
        try {
            TPipeTransferResp pipeTransfer = this.client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(constructParamsMap()));
            if (pipeTransfer.getStatus().getCode() == TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
                IOT_PRINTER.println(String.format("Handshake error with target server ip: %s, port: %s, because: %s. Retry to handshake by PipeTransferHandshakeV1Req.", this.client.getIpAddress(), Integer.valueOf(this.client.getPort()), pipeTransfer.getStatus()));
                pipeTransfer = this.client.pipeTransfer(PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(getTimePrecision()));
            }
            if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                throw new PipeConnectionException(String.format("Handshake error with target server ip: %s, port: %s, because: %s.", this.client.getIpAddress(), Integer.valueOf(this.client.getPort()), pipeTransfer.getStatus()));
            }
            this.client.setTimeout(CONNECTION_TIMEOUT_MS.get());
            IOT_PRINTER.println(String.format("Handshake success. Target server ip: %s, port: %s", this.client.getIpAddress(), Integer.valueOf(this.client.getPort())));
        } catch (Exception e) {
            throw new PipeException(String.format("Handshake error with target server ip: %s, port: %s, because: %s.", this.client.getIpAddress(), Integer.valueOf(this.client.getPort()), e.getMessage()));
        }
    }

    private Map<String, String> constructParamsMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("timestampPrecision", getTimePrecision());
        hashMap.put("clusterID", getClusterId());
        hashMap.put("convertOnTypeMismatch", Boolean.toString(true));
        hashMap.put("loadTsFileStrategy", LOAD_STRATEGY);
        hashMap.put("username", username);
        hashMap.put("password", password);
        return hashMap;
    }

    public void doTransfer(File file, File file2) throws PipeException, IOException {
        PipeTransferTsFileSealWithModReq tPipeTransferReq;
        if (Objects.nonNull(file2)) {
            transferFilePieces(file2, true);
            transferFilePieces(file, true);
            tPipeTransferReq = PipeTransferTsFileSealWithModReq.toTPipeTransferReq(file2.getName(), file2.length(), file.getName(), file.length());
        } else {
            transferFilePieces(file, false);
            tPipeTransferReq = PipeTransferTsFileSealReq.toTPipeTransferReq(file.getName(), file.length());
        }
        try {
            TSStatus status = this.client.pipeTransfer(tPipeTransferReq).getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                throw new PipeConnectionException(String.format("Seal file %s error, result status %s.", file, status));
            }
            IOT_PRINTER.println("Successfully transferred file " + file);
        } catch (Exception e) {
            throw new PipeConnectionException(String.format("Network error when seal file %s, because %s.", file, e.getMessage()), e);
        }
    }

    private void transferFilePieces(File file, boolean z) throws PipeException, IOException {
        PipeTransferFilePieceReq transferMultiFilePieceReq;
        int pipeConnectorReadFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
        byte[] bArr = new byte[pipeConnectorReadFileBufferSize];
        long j = 0;
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
        while (true) {
            try {
                int read = randomAccessFile.read(bArr);
                if (read == -1) {
                    randomAccessFile.close();
                    return;
                }
                byte[] copyOfRange = read == pipeConnectorReadFileBufferSize ? bArr : Arrays.copyOfRange(bArr, 0, read);
                if (z) {
                    try {
                        transferMultiFilePieceReq = getTransferMultiFilePieceReq(file.getName(), j, copyOfRange);
                    } catch (Exception e) {
                        throw new PipeConnectionException(String.format("Network error when transfer file %s, because %s.", file, e.getMessage()), e);
                    }
                } else {
                    transferMultiFilePieceReq = getTransferSingleFilePieceReq(file.getName(), j, copyOfRange);
                }
                PipeTransferFilePieceResp fromTPipeTransferResp = PipeTransferFilePieceResp.fromTPipeTransferResp(this.client.pipeTransfer(transferMultiFilePieceReq));
                j += read;
                TSStatus status = fromTPipeTransferResp.getStatus();
                if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                    j = fromTPipeTransferResp.getEndWritingOffset();
                    randomAccessFile.seek(j);
                    IOT_PRINTER.println(String.format("Redirect file position to %s.", Long.valueOf(j)));
                } else {
                    if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
                        sendHandshake();
                    }
                    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                        throw new PipeException(String.format("Transfer file %s error, result status %s.", file, status));
                    }
                }
            } catch (Throwable th) {
                try {
                    randomAccessFile.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    private PipeTransferFilePieceReq getTransferMultiFilePieceReq(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(str, j, bArr);
    }

    private PipeTransferFilePieceReq getTransferSingleFilePieceReq(String str, long j, byte[] bArr) throws IOException {
        return PipeTransferTsFilePieceReq.toTPipeTransferReq(str, j, bArr);
    }

    private void initClient() {
        try {
            this.client = new IoTDBSyncClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs()).setRpcThriftCompressionEnabled(PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled()).build(), getEndPoint().getIp(), getEndPoint().getPort(), false, "", "");
        } catch (TTransportException e) {
            throw new PipeException("Sync client init error because " + e.getMessage());
        }
    }

    private TEndPoint getEndPoint() {
        return new TEndPoint(host, Integer.parseInt(port));
    }

    private String getClusterId() {
        byte[] bArr = new byte[32];
        new SecureRandom().nextBytes(bArr);
        return "TSFILE-IMPORTER-" + UUID.nameUUIDFromBytes(bArr);
    }

    private void close() {
        try {
            if (this.client != null) {
                this.client.close();
            }
        } catch (Exception e) {
            IOT_PRINTER.println("Failed to close client because " + e.getMessage());
        }
    }

    public static void setHost(String str) {
        host = str;
    }

    public static void setPort(String str) {
        port = str;
    }

    public static void setUsername(String str) {
        username = str;
    }

    public static void setPassword(String str) {
        password = str;
    }
}
