package org.apache.flink.shaded.net.snowflake.ingest.streaming.internal;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.shaded.net.snowflake.client.core.ExecTimeTelemetryData;
import org.apache.flink.shaded.net.snowflake.client.core.HttpClientSettingsKey;
import org.apache.flink.shaded.net.snowflake.client.core.OCSPMode;
import org.apache.flink.shaded.net.snowflake.client.jdbc.RestRequest;
import org.apache.flink.shaded.net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.flink.shaded.net.snowflake.client.jdbc.SnowflakeUtil;
import org.apache.flink.shaded.net.snowflake.client.jdbc.cloud.storage.SnowflakeStorageClient;
import org.apache.flink.shaded.net.snowflake.client.jdbc.cloud.storage.StageInfo;
import org.apache.flink.shaded.net.snowflake.client.jdbc.cloud.storage.StorageClientFactory;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.amazonaws.SDKGlobalConfiguration;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.apache.http.client.HttpResponseException;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPut;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.apache.http.entity.ByteArrayEntity;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes;
import org.apache.flink.shaded.net.snowflake.ingest.connection.IngestResponseException;
import org.apache.flink.shaded.net.snowflake.ingest.internal.apache.hadoop.security.IdMappingConstant;
import org.apache.flink.shaded.net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse;
import org.apache.flink.shaded.net.snowflake.ingest.utils.ErrorCode;
import org.apache.flink.shaded.net.snowflake.ingest.utils.HttpUtil;
import org.apache.flink.shaded.net.snowflake.ingest.utils.Logging;
import org.apache.flink.shaded.net.snowflake.ingest.utils.SFException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/shaded/net/snowflake/ingest/streaming/internal/ExternalVolume.class */
public class ExternalVolume implements IStorage {
    private static final Logging logger = new Logging(ExternalVolume.class);
    private static final int DEFAULT_PRESIGNED_URL_COUNT = 10;
    private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900;
    private static final int MAX_CONCURRENT_GENERATE_URLS_REQUESTS = 10;
    private static final int LOW_WATERMARK_FOR_EARLY_REFRESH = 5;
    private final String clientName;
    private final String clientPrefix;
    private final Long deploymentId;
    private final String role;
    private final TableRef tableRef;
    private final SnowflakeServiceClient serviceClient;
    private final FileLocationInfo locationInfo;
    private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata;
    private final ConcurrentLinkedQueue<GeneratePresignedUrlsResponse.PresignedUrlInfo> presignedUrlInfos = new ConcurrentLinkedQueue<>();
    private final AtomicInteger numUrlsInQueue = new AtomicInteger(0);
    private final Semaphore generateUrlsSemaphore = new Semaphore(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExternalVolume(String str, String str2, Long l, String str3, TableRef tableRef, FileLocationInfo fileLocationInfo, SnowflakeServiceClient snowflakeServiceClient) {
        this.clientName = str;
        this.clientPrefix = str2;
        this.deploymentId = l;
        this.role = str3;
        this.tableRef = tableRef;
        this.serviceClient = snowflakeServiceClient;
        this.locationInfo = fileLocationInfo;
        if (this.locationInfo.getIsClientSideEncrypted()) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Cannot ingest into an external volume that requests client side encryption");
        }
        if ("S3".equalsIgnoreCase(this.locationInfo.getLocationType())) {
            this.locationInfo.getCredentials().put("AWS_KEY_ID", "key");
            this.locationInfo.getCredentials().put(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR, "secret");
        }
        try {
            this.fileTransferMetadata = InternalStage.createFileTransferMetadataWithAge(this.locationInfo);
            generateUrls(5, false);
        } catch (SnowflakeSQLException | JsonProcessingException | org.apache.flink.shaded.net.snowflake.ingest.internal.fasterxml.jackson.core.JsonProcessingException e) {
            throw new SFException(e, ErrorCode.INTERNAL_ERROR, new Object[0]);
        }
    }

    @Override // org.apache.flink.shaded.net.snowflake.ingest.streaming.internal.IStorage
    public void put(BlobPath blobPath, byte[] bArr) {
        if (this.fileTransferMetadata.isLocalFS) {
            InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, bArr);
            return;
        }
        try {
            putRemote(blobPath.blobPath, bArr);
        } catch (Throwable th) {
            throw new SFException(th, ErrorCode.BLOB_UPLOAD_FAILURE, new Object[0]);
        }
    }

    private void putRemote(String str, byte[] bArr) throws SnowflakeSQLException, URISyntaxException, IOException {
        HttpClientSettingsKey convertProxyPropertiesToHttpClientKey = SnowflakeUtil.convertProxyPropertiesToHttpClientKey(OCSPMode.FAIL_OPEN, HttpUtil.generateProxyPropertiesForJDBC());
        StageInfo stageInfo = this.fileTransferMetadata.fileTransferMetadata.getStageInfo();
        SnowflakeStorageClient createClient = StorageClientFactory.getFactory().createClient(stageInfo, 1, null, null);
        HttpPut httpPut = new HttpPut(new URIBuilder(str).build());
        httpPut.setEntity(new ByteArrayEntity(bArr));
        addHeadersToHttpRequest(httpPut, bArr, stageInfo, createClient);
        if (stageInfo.getStageType().equals(StageInfo.StageType.AZURE)) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Azure based external volumes are not yet supported.");
        }
        CloseableHttpResponse execute = RestRequest.execute(org.apache.flink.shaded.net.snowflake.client.core.HttpUtil.getHttpClient(convertProxyPropertiesToHttpClientKey), httpPut, 0L, 0L, (int) org.apache.flink.shaded.net.snowflake.client.core.HttpUtil.getSocketTimeout().toMillis(), 1, 0, null, false, false, false, true, true, new ExecTimeTelemetryData());
        if (HttpStatusCodes.isSuccess(execute.getStatusLine().getStatusCode())) {
            return;
        }
        createClient.handleStorageException(new HttpResponseException(execute.getStatusLine().getStatusCode(), String.format("%s, body: %s", execute.getStatusLine().getReasonPhrase(), EntityUtils.toString(execute.getEntity()))), 0, "upload", null, null, null);
    }

    private void addHeadersToHttpRequest(HttpPut httpPut, byte[] bArr, StageInfo stageInfo, SnowflakeStorageClient snowflakeStorageClient) {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GeneratePresignedUrlsResponse.PresignedUrlInfo dequeueUrlInfo() {
        GeneratePresignedUrlsResponse.PresignedUrlInfo poll;
        int decrementAndGet;
        long currentTimeMillis = System.currentTimeMillis() + 60000;
        while (true) {
            poll = this.presignedUrlInfos.poll();
            if (poll != null) {
                decrementAndGet = this.numUrlsInQueue.decrementAndGet();
                if (poll.validUntilTimestamp >= currentTimeMillis) {
                    break;
                }
            } else {
                generateUrls(5, true);
            }
        }
        if (decrementAndGet <= 5) {
            generateUrls(5, false);
        }
        return poll;
    }

    private void generateUrls(int i, boolean z) {
        int i2 = 0;
        boolean z2 = false;
        while (!z2) {
            int i3 = i2;
            i2++;
            if (i3 >= 300) {
                break;
            }
            try {
                z2 = this.generateUrlsSemaphore.tryAcquire(z ? 1 : 0, TimeUnit.SECONDS);
                if (this.numUrlsInQueue.get() >= i) {
                    if (z2) {
                        this.generateUrlsSemaphore.release();
                        return;
                    }
                    return;
                } else if (!z2 && !z) {
                    logger.logDebug("Skipping generateUrls because semaphore acquisition failed AND waitUntilAcquired == false.");
                    return;
                }
            } catch (InterruptedException e) {
                String format = String.format("Semaphore acquisition in ExternalVolume.generateUrls was interrupted, likely because the process is shutting down. TableRef=%s Thread.interrupted=%s", this.tableRef, Boolean.valueOf(Thread.interrupted()));
                logger.logError(format);
                throw new SFException(ErrorCode.INTERNAL_ERROR, format);
            }
        }
        if (!z2) {
            String format2 = String.format("Could not acquire semaphore to generate URLs. TableRef=%s", this.tableRef);
            logger.logError(format2);
            throw new SFException(ErrorCode.INTERNAL_ERROR, format2);
        }
        try {
            long currentTimeMillis = System.currentTimeMillis() + IdMappingConstant.USERGROUPID_UPDATE_MILLIS_DEFAULT;
            List list = (List) doGenerateUrls(900).getPresignedUrlInfos().stream().map(presignedUrlInfo -> {
                presignedUrlInfo.validUntilTimestamp = currentTimeMillis;
                return presignedUrlInfo;
            }).filter(presignedUrlInfo2 -> {
                if (presignedUrlInfo2 != null && presignedUrlInfo2.url != null && presignedUrlInfo2.fileName != null && !presignedUrlInfo2.url.isEmpty()) {
                    return true;
                }
                logger.logError("Received unexpected null or empty URL in externalVolume.generateUrls tableRef=%s", this.tableRef);
                return false;
            }).collect(Collectors.toList());
            this.presignedUrlInfos.addAll(list);
            this.numUrlsInQueue.addAndGet(list.size());
            this.generateUrlsSemaphore.release();
        } catch (Throwable th) {
            this.generateUrlsSemaphore.release();
            throw th;
        }
    }

    private GeneratePresignedUrlsResponse doGenerateUrls(int i) {
        try {
            return this.serviceClient.generatePresignedUrls(new GeneratePresignedUrlsRequest(this.tableRef, this.role, 10, i, this.deploymentId, true));
        } catch (IOException | IngestResponseException e) {
            throw new SFException(e, ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, e.getMessage());
        }
    }
}
