package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.class */
public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
    private final Map<Pair<String, Long>, Double> pipeName2WeightMap;
    private final List<EnrichedEvent> events;
    private final AtomicInteger eventsReferenceCount;
    private final AtomicBoolean eventsHadBeenAddedToRetryQueue;
    private final File tsFile;
    private final File modFile;
    private File currentFile;
    private final boolean transferMod;
    private final String dataBaseName;
    private final int readFileBufferSize;
    private final PipeTsFileMemoryBlock memoryBlock;
    private final byte[] readBuffer;
    private long position;
    private RandomAccessFile reader;
    private final AtomicBoolean isSealSignalSent;
    private IoTDBDataNodeAsyncClientManager clientManager;
    private volatile AsyncPipeDataTransferServiceClient client;

    public PipeTransferTsFileHandler(IoTDBDataRegionAsyncConnector ioTDBDataRegionAsyncConnector, Map<Pair<String, Long>, Double> map, List<EnrichedEvent> list, AtomicInteger atomicInteger, AtomicBoolean atomicBoolean, File file, File file2, boolean z, String str) throws FileNotFoundException, InterruptedException {
        super(ioTDBDataRegionAsyncConnector);
        this.pipeName2WeightMap = map;
        this.events = list;
        this.eventsReferenceCount = atomicInteger;
        this.eventsHadBeenAddedToRetryQueue = atomicBoolean;
        this.tsFile = file;
        this.modFile = file2;
        this.transferMod = z;
        this.dataBaseName = str;
        this.currentFile = z ? file2 : file;
        waitForResourceEnough4Slicing((long) ((1.0d + Math.random()) * 20.0d * 1000.0d));
        this.readFileBufferSize = (int) Math.min(PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(), z ? Math.max(file.length(), file2.length()) : file.length());
        this.memoryBlock = PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled() ? this.readFileBufferSize : 0L);
        this.readBuffer = new byte[this.readFileBufferSize];
        this.position = 0L;
        this.reader = Objects.nonNull(file2) ? new RandomAccessFile(file2, "r") : new RandomAccessFile(file, "r");
        this.isSealSignalSent = new AtomicBoolean(false);
    }

    public void transfer(IoTDBDataNodeAsyncClientManager ioTDBDataNodeAsyncClientManager, AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient) throws TException, IOException {
        this.clientManager = ioTDBDataNodeAsyncClientManager;
        this.client = asyncPipeDataTransferServiceClient;
        if (asyncPipeDataTransferServiceClient == null) {
            LOGGER.warn("Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", this.connector.isClosed() ? "CLOSED" : "NOT CLOSED", this.tsFile);
            return;
        }
        asyncPipeDataTransferServiceClient.setShouldReturnSelf(false);
        asyncPipeDataTransferServiceClient.setTimeoutDynamically(ioTDBDataNodeAsyncClientManager.getConnectionTimeout());
        int read = this.reader.read(this.readBuffer);
        if (read != -1) {
            byte[] copyOfRange = read == this.readFileBufferSize ? this.readBuffer : Arrays.copyOfRange(this.readBuffer, 0, read);
            TPipeTransferReq tPipeTransferReq = this.transferMod ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(this.currentFile.getName(), this.position, copyOfRange) : PipeTransferTsFilePieceReq.toTPipeTransferReq(this.currentFile.getName(), this.position, copyOfRange);
            TPipeTransferReq tPipeTransferReq2 = this.connector.isRpcCompressionEnabled() ? PipeTransferCompressedReq.toTPipeTransferReq(tPipeTransferReq, this.connector.getCompressors()) : tPipeTransferReq;
            this.pipeName2WeightMap.forEach((pair, d) -> {
                this.connector.rateLimitIfNeeded((String) pair.getLeft(), ((Long) pair.getRight()).longValue(), asyncPipeDataTransferServiceClient.getEndPoint(), (long) (tPipeTransferReq2.getBody().length * d.doubleValue()));
            });
            if (tryTransfer(asyncPipeDataTransferServiceClient, tPipeTransferReq2)) {
                this.position += read;
                return;
            }
            return;
        }
        if (this.currentFile == this.modFile) {
            this.currentFile = this.tsFile;
            this.position = 0L;
            try {
                this.reader.close();
            } catch (IOException e) {
                LOGGER.warn("Failed to close file reader when successfully transferred mod file.", e);
            }
            this.reader = new RandomAccessFile(this.tsFile, "r");
            transfer(ioTDBDataNodeAsyncClientManager, asyncPipeDataTransferServiceClient);
            return;
        }
        if (this.currentFile == this.tsFile) {
            this.isSealSignalSent.set(true);
            PipeTransferTsFileSealWithModReq tPipeTransferReq3 = this.transferMod ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(this.modFile.getName(), this.modFile.length(), this.tsFile.getName(), this.tsFile.length(), this.dataBaseName) : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(this.tsFile.getName(), this.tsFile.length(), this.dataBaseName);
            PipeTransferTsFileSealWithModReq tPipeTransferReq4 = this.connector.isRpcCompressionEnabled() ? PipeTransferCompressedReq.toTPipeTransferReq(tPipeTransferReq3, this.connector.getCompressors()) : tPipeTransferReq3;
            this.pipeName2WeightMap.forEach((pair2, d2) -> {
                this.connector.rateLimitIfNeeded((String) pair2.getLeft(), ((Long) pair2.getRight()).longValue(), asyncPipeDataTransferServiceClient.getEndPoint(), (long) (tPipeTransferReq4.getBody().length * d2.doubleValue()));
            });
            if (tryTransfer(asyncPipeDataTransferServiceClient, tPipeTransferReq4)) {
            }
        }
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler
    public void onComplete(TPipeTransferResp tPipeTransferResp) {
        try {
            super.onComplete(tPipeTransferResp);
        } finally {
            if (this.connector.isClosed()) {
                returnClientIfNecessary();
            }
        }
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler
    protected boolean onCompleteInternal(TPipeTransferResp tPipeTransferResp) {
        if (!this.isSealSignalSent.get()) {
            try {
                PipeTransferFilePieceResp fromTPipeTransferResp = PipeTransferFilePieceResp.fromTPipeTransferResp(tPipeTransferResp);
                if (fromTPipeTransferResp.getStatus().getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                    this.position = fromTPipeTransferResp.getEndWritingOffset();
                    this.reader.seek(this.position);
                    LOGGER.info("Redirect file position to {}.", Long.valueOf(this.position));
                } else {
                    TSStatus status = tPipeTransferResp.getStatus();
                    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                        this.connector.statusHandler().handle(status, tPipeTransferResp.getStatus().getMessage(), this.tsFile.getName());
                    }
                }
                transfer(this.clientManager, this.client);
                return false;
            } catch (Exception e) {
                onError(e);
                return false;
            }
        }
        try {
            TSStatus status2 = tPipeTransferResp.getStatus();
            if (status2.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status2.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.connector.statusHandler().handle(status2, String.format("Seal file %s error, result status %s.", this.tsFile, tPipeTransferResp.getStatus()), this.tsFile.getName());
            }
            try {
                try {
                    if (this.reader != null) {
                        this.reader.close();
                    }
                    if (this.events.stream().anyMatch(enrichedEvent -> {
                        return !(enrichedEvent instanceof PipeTsFileInsertionEvent);
                    })) {
                        RetryUtils.retryOnException(() -> {
                            FileUtils.delete(this.currentFile);
                            return null;
                        });
                    }
                    int decrementAndGet = this.eventsReferenceCount.decrementAndGet();
                    if (decrementAndGet <= 0) {
                        this.events.forEach(enrichedEvent2 -> {
                            enrichedEvent2.decreaseReferenceCount(PipeTransferTsFileHandler.class.getName(), true);
                        });
                    }
                    if (this.events.size() <= 1 || LOGGER.isDebugEnabled()) {
                        LOGGER.info("Successfully transferred file {} (committer key={}, commit id={}, reference count={}).", new Object[]{this.tsFile, this.events.stream().map((v0) -> {
                            return v0.getCommitterKey();
                        }).collect(Collectors.toList()), this.events.stream().map((v0) -> {
                            return v0.getCommitId();
                        }).collect(Collectors.toList()), Integer.valueOf(decrementAndGet)});
                    } else {
                        LOGGER.info("Successfully transferred file {} (batched TableInsertionEvents, reference count={}).", this.tsFile, Integer.valueOf(decrementAndGet));
                    }
                    returnClientIfNecessary();
                    return true;
                } catch (IOException e2) {
                    LOGGER.warn("Failed to close file reader or delete tsFile when successfully transferred file.", e2);
                    int decrementAndGet2 = this.eventsReferenceCount.decrementAndGet();
                    if (decrementAndGet2 <= 0) {
                        this.events.forEach(enrichedEvent22 -> {
                            enrichedEvent22.decreaseReferenceCount(PipeTransferTsFileHandler.class.getName(), true);
                        });
                    }
                    if (this.events.size() <= 1 || LOGGER.isDebugEnabled()) {
                        LOGGER.info("Successfully transferred file {} (committer key={}, commit id={}, reference count={}).", new Object[]{this.tsFile, this.events.stream().map((v0) -> {
                            return v0.getCommitterKey();
                        }).collect(Collectors.toList()), this.events.stream().map((v0) -> {
                            return v0.getCommitId();
                        }).collect(Collectors.toList()), Integer.valueOf(decrementAndGet2)});
                    } else {
                        LOGGER.info("Successfully transferred file {} (batched TableInsertionEvents, reference count={}).", this.tsFile, Integer.valueOf(decrementAndGet2));
                    }
                    returnClientIfNecessary();
                    return true;
                }
            } catch (Throwable th) {
                int decrementAndGet3 = this.eventsReferenceCount.decrementAndGet();
                if (decrementAndGet3 <= 0) {
                    this.events.forEach(enrichedEvent222 -> {
                        enrichedEvent222.decreaseReferenceCount(PipeTransferTsFileHandler.class.getName(), true);
                    });
                }
                if (this.events.size() <= 1 || LOGGER.isDebugEnabled()) {
                    LOGGER.info("Successfully transferred file {} (committer key={}, commit id={}, reference count={}).", new Object[]{this.tsFile, this.events.stream().map((v0) -> {
                        return v0.getCommitterKey();
                    }).collect(Collectors.toList()), this.events.stream().map((v0) -> {
                        return v0.getCommitId();
                    }).collect(Collectors.toList()), Integer.valueOf(decrementAndGet3)});
                } else {
                    LOGGER.info("Successfully transferred file {} (batched TableInsertionEvents, reference count={}).", this.tsFile, Integer.valueOf(decrementAndGet3));
                }
                returnClientIfNecessary();
                throw th;
            }
        } catch (Exception e3) {
            onError(e3);
            return false;
        }
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler
    public void onError(Exception exc) {
        try {
            super.onError(exc);
        } finally {
            returnClientIfNecessary();
        }
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler
    protected void onErrorInternal(Exception exc) {
        try {
            if (this.events.size() <= 1 || LOGGER.isDebugEnabled()) {
                LOGGER.warn("Failed to transfer TsFileInsertionEvent {} (committer key {}, commit id {}).", new Object[]{this.tsFile, this.events.stream().map((v0) -> {
                    return v0.getCommitterKey();
                }).collect(Collectors.toList()), this.events.stream().map((v0) -> {
                    return v0.getCommitId();
                }).collect(Collectors.toList()), exc});
            } else {
                LOGGER.warn("Failed to transfer TsFileInsertionEvent {} (batched TableInsertionEvents)", this.tsFile, exc);
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to log error when failed to transfer file.", e);
        }
        try {
            if (Objects.nonNull(this.clientManager)) {
                this.clientManager.adjustTimeoutIfNecessary(exc);
            }
        } catch (Exception e2) {
            LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e2);
        }
        try {
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                if (this.events.stream().anyMatch(enrichedEvent -> {
                    return !(enrichedEvent instanceof PipeTsFileInsertionEvent);
                })) {
                    RetryUtils.retryOnException(() -> {
                        FileUtils.delete(this.currentFile);
                        return null;
                    });
                }
                try {
                    returnClientIfNecessary();
                    if (this.eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
                        this.connector.addFailureEventsToRetryQueue(this.events);
                    }
                } finally {
                }
            } catch (Throwable th) {
                try {
                    returnClientIfNecessary();
                    throw th;
                } finally {
                    if (this.eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
                        this.connector.addFailureEventsToRetryQueue(this.events);
                    }
                }
            }
        } catch (IOException e3) {
            LOGGER.warn("Failed to close file reader or delete tsFile when failed to transfer file.", e3);
            try {
                returnClientIfNecessary();
                if (this.eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
                    this.connector.addFailureEventsToRetryQueue(this.events);
                }
            } finally {
                if (this.eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
                    this.connector.addFailureEventsToRetryQueue(this.events);
                }
            }
        }
    }

    private void returnClientIfNecessary() {
        if (this.client != null) {
            this.client.setShouldReturnSelf(true);
            this.client.returnSelf();
            this.client = null;
        }
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler
    protected void doTransfer(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient, TPipeTransferReq tPipeTransferReq) throws TException {
        if (asyncPipeDataTransferServiceClient == null) {
            LOGGER.warn("Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", this.connector.isClosed() ? "CLOSED" : "NOT CLOSED", this.tsFile);
        } else {
            asyncPipeDataTransferServiceClient.pipeTransfer(tPipeTransferReq, this);
        }
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler
    public void clearEventsReferenceCount() {
        this.events.forEach(enrichedEvent -> {
            enrichedEvent.clearReferenceCount(PipeTransferTsFileHandler.class.getName());
        });
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.memoryBlock.close();
    }

    private void waitForResourceEnough4Slicing(long j) throws InterruptedException {
        if (PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) {
            PipeMemoryManager memory = PipeDataNodeResourceManager.memory();
            if (memory.isEnough4TsFileSlicing()) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            long pipeCheckMemoryEnoughIntervalMs = PipeConfig.getInstance().getPipeCheckMemoryEnoughIntervalMs();
            while (!memory.isEnough4TsFileSlicing()) {
                Thread.sleep(pipeCheckMemoryEnoughIntervalMs);
                long currentTimeMillis2 = System.currentTimeMillis();
                double d = (currentTimeMillis2 - currentTimeMillis) / 1000.0d;
                double d2 = (currentTimeMillis2 - r0) / 1000.0d;
                if (d > 10.0d) {
                    LOGGER.info("Wait for resource enough for slicing tsfile {} for {} seconds.", this.tsFile, Double.valueOf(d2));
                    currentTimeMillis = currentTimeMillis2;
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Wait for resource enough for slicing tsfile {} for {} seconds.", this.tsFile, Double.valueOf(d2));
                }
                if (d2 * 1000.0d > j) {
                    throw new PipeException(String.format("TimeoutException: Waited %s seconds", Double.valueOf(d2)));
                }
            }
            LOGGER.info("Wait for resource enough for slicing tsfile {} for {} seconds.", this.tsFile, Double.valueOf((System.currentTimeMillis() - r0) / 1000.0d));
        }
    }
}
