package io.deltastream.flink.connector.snowflake.sink;

import io.deltastream.flink.connector.snowflake.sink.context.SnowflakeSinkContext;
import io.deltastream.flink.connector.snowflake.sink.serialization.SnowflakeRowSerializationSchema;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava32.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/SnowflakeSinkITCase.class */
class SnowflakeSinkITCase {
    private static final String url = SystemUtils.getEnvironmentVariable("SNOWFLAKE_URL", "fake.sf.com:443");
    private static final String user = SystemUtils.getEnvironmentVariable("SNOWFLAKE_USER", "SF_USER");
    private static final String role = SystemUtils.getEnvironmentVariable("SNOWFLAKE_ROLE", "SF_ROLE");
    private static final String key = SystemUtils.getEnvironmentVariable("SNOWFLAKE_PRIVATE_KEY", "");
    private static final String keyPass = SystemUtils.getEnvironmentVariable("SNOWFLAKE_KEY_PASSPHRASE", "");

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).build());

    /* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/SnowflakeSinkITCase$RowPassThroughSerializer.class */
    private static class RowPassThroughSerializer implements SnowflakeRowSerializationSchema<Map<String, Object>> {
        private static final long serialVersionUID = -23875899103249615L;

        private RowPassThroughSerializer() {
        }

        public Map<String, Object> serialize(Map<String, Object> map, SnowflakeSinkContext snowflakeSinkContext) {
            return map;
        }
    }

    /* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/SnowflakeSinkITCase$SfRowMapFunction.class */
    private static class SfRowMapFunction implements MapFunction<Long, Map<String, Object>> {
        private static final long serialVersionUID = -2836417330784371895L;

        private SfRowMapFunction() {
        }

        public Map<String, Object> map(Long l) {
            String uuid = UUID.randomUUID().toString();
            return Maps.newHashMap(Map.of("\"id\"", uuid + "-" + l, "\"data\"", uuid + "_" + l));
        }
    }

    SnowflakeSinkITCase() {
    }

    @Test
    public void testSuccessfulWriteToSnowflake() throws Exception {
        SnowflakeSinkBuilder serializationSchema = SnowflakeSink.builder().url(url).user(user).role(role).bufferTimeMillis(2000L).database("FLINK_STREAMING").schema("PUBLIC").table("\"stream_data_tbl\"").serializationSchema(new RowPassThroughSerializer());
        if (StringUtils.isNotBlank(key)) {
            serializationSchema.privateKey(key);
        }
        if (StringUtils.isNotBlank(keyPass)) {
            serializationSchema.keyPassphrase(keyPass);
        }
        SnowflakeSink build = serializationSchema.build("sf_sink_job");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(Configuration.fromMap(Map.of(RestartStrategyOptions.RESTART_STRATEGY.key(), RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue())));
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromSequence(1L, 10L).map(new SfRowMapFunction()).sinkTo(build);
        executionEnvironment.execute();
    }
}
