package io.deltastream.flink.connector.snowflake.sink.internal;

import io.deltastream.flink.connector.snowflake.sink.config.SnowflakeChannelConfig;
import io.deltastream.flink.connector.snowflake.sink.config.SnowflakeWriterConfig;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.shaded.dev.failsafe.Failsafe;
import org.apache.flink.shaded.dev.failsafe.Fallback;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.net.snowflake.ingest.streaming.InsertValidationResponse;
import org.apache.flink.shaded.net.snowflake.ingest.streaming.OpenChannelRequest;
import org.apache.flink.shaded.net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import org.apache.flink.shaded.net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import org.apache.flink.shaded.net.snowflake.ingest.utils.SFException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/internal/SnowflakeSinkServiceImpl.class */
public class SnowflakeSinkServiceImpl implements SnowflakeSinkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSinkServiceImpl.class);
    private final SnowflakeWriterConfig writerConfig;
    private final SnowflakeChannelConfig channelConfig;
    private final SnowflakeStreamingIngestClient client;
    private final String channelName;
    private SnowflakeStreamingIngestChannel channel;
    private final Counter numRecordsSendCounter;
    private final Counter numRecordsSendError;

    public SnowflakeWriterConfig getWriterConfig() {
        return this.writerConfig;
    }

    public SnowflakeChannelConfig getChannelConfig() {
        return this.channelConfig;
    }

    public SnowflakeStreamingIngestClient getClient() {
        return this.client;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public SnowflakeStreamingIngestChannel getChannel() {
        return this.channel;
    }

    public SnowflakeSinkServiceImpl(String str, int i, Properties properties, SnowflakeWriterConfig snowflakeWriterConfig, SnowflakeChannelConfig snowflakeChannelConfig, SinkWriterMetricGroup sinkWriterMetricGroup) {
        this.writerConfig = (SnowflakeWriterConfig) Preconditions.checkNotNull(snowflakeWriterConfig, "writerConfig");
        this.channelConfig = (SnowflakeChannelConfig) Preconditions.checkNotNull(snowflakeChannelConfig, "channelConfig");
        this.client = createClientFromConfig(str, properties);
        this.channelName = SnowflakeInternalUtils.createClientOrChannelName(null, str, Integer.valueOf(i));
        LOGGER.info("Opening a new ingest channel '{}' for the client '{}'", getChannelName(), getClient().getName());
        this.channel = (SnowflakeStreamingIngestChannel) Preconditions.checkNotNull(openChannelFromConfig());
        SinkWriterMetricGroup sinkWriterMetricGroup2 = (SinkWriterMetricGroup) Preconditions.checkNotNull(sinkWriterMetricGroup, "metricGroup");
        this.numRecordsSendCounter = sinkWriterMetricGroup2.getNumRecordsSendCounter();
        this.numRecordsSendError = sinkWriterMetricGroup2.getNumRecordsSendErrorsCounter();
    }

    @Override // io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkService
    public void insert(Map<String, Object> map) throws IOException {
        InsertValidationResponse insertRowWithFallback = insertRowWithFallback(map);
        this.numRecordsSendCounter.inc();
        LOGGER.debug("Submitted row to Snowflake ingest channel '{}'", getChannelName());
        if (insertRowWithFallback.hasErrors()) {
            LOGGER.debug("Encountered error on row submission to Snowflake ingest channel '{}'", getChannelName());
            this.numRecordsSendError.inc(insertRowWithFallback.getErrorRowCount());
            handleInsertRowsErrors(insertRowWithFallback.getInsertErrors());
        }
    }

    @Override // io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkService
    public void flush() {
        if (getWriterConfig().getDeliveryGuarantee().equals(DeliveryGuarantee.NONE)) {
            LOGGER.info("Skipping force flush for Snowflake ingest channel '{}' for delivery guarantee NONE", getChannelName());
            return;
        }
        LOGGER.debug("Flushing Snowflake ingest channel '{}'", getChannelName());
        Object invoke = invoke(getChannel(), "flush", (Class[]) Lists.newArrayList(new Class[]{Boolean.TYPE}).toArray(i -> {
            return new Class[i];
        }), Lists.newArrayList(new Boolean[]{false}).toArray());
        if (!(invoke instanceof CompletableFuture)) {
            LOGGER.warn("Snowflake channel flush did not return a handle to wait on: got {};Flush will happen within the next buffer time of {}ms", invoke.getClass().getSimpleName(), Long.valueOf(getWriterConfig().getMaxBufferTimeMs()));
            return;
        }
        try {
            ((CompletableFuture) invoke).get();
            LOGGER.info("Successfully flushed channel '{}'", getChannelName());
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.warn("Snowflake channel flush did not finish successfully;Flush will happen within the next buffer time of {}ms", Long.valueOf(getWriterConfig().getMaxBufferTimeMs()));
        }
    }

    SnowflakeStreamingIngestClient createClientFromConfig(String str, Properties properties) {
        return SnowflakeStreamingIngestClientProvider.createClient(str, properties, getWriterConfig());
    }

    private SnowflakeStreamingIngestChannel openChannelFromConfig() {
        OpenChannelRequest build = OpenChannelRequest.builder(getChannelName()).setDBName(getChannelConfig().getDatabaseName()).setSchemaName(getChannelConfig().getSchemaName()).setTableName(getChannelConfig().getTableName()).setOnErrorOption(getChannelConfig().getOnErrorOption()).build();
        LOGGER.debug("Opening a '{}' channel for table '{}'", getChannelName(), getChannelConfig().getTableName());
        SnowflakeStreamingIngestChannel openChannel = getClient().openChannel(build);
        LOGGER.info("Successfully opened channel '{}' for table '{}'", getChannelName(), getChannelConfig().getTableName());
        return openChannel;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!getChannel().isClosed()) {
            LOGGER.info("Closing Snowflake ingest channel '{}'", getChannel().getName());
            try {
                getChannel().close().get();
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.error("Failed to cleanly close the Snowflake ingest channel '{}'", getChannelName());
            }
            LOGGER.info("Snowflake ingest channel '{}' has been successfully closed", getChannel().getName());
        }
        LOGGER.info("Closing Snowflake ingest client '{}'", getClient().getName());
        IOUtils.closeAll(new AutoCloseable[]{getClient()});
        LOGGER.info("Snowflake ingest client '{}' has been successfully closed", getClient().getName());
    }

    private void handleInsertRowsErrors(List<InsertValidationResponse.InsertError> list) throws IOException {
        if (!list.isEmpty()) {
            throw new IOException(String.format("Encountered errors while ingesting rows into Snowflake: %s", list.get(0).getException().getMessage()), list.get(0).getException());
        }
    }

    private InsertValidationResponse insertRowWithFallback(Map<String, Object> map) {
        return (InsertValidationResponse) Failsafe.with(Fallback.builder(executionAttemptedEvent -> {
            ingestionFallbackSupplier(executionAttemptedEvent.getLastException());
        }).handle(SFException.class).onFailedAttempt(executionAttemptedEvent2 -> {
            LOGGER.warn("Failed to send row to ingest channel", executionAttemptedEvent2.getLastException());
        }).onFailure(executionCompletedEvent -> {
            LOGGER.error(String.format("[INSERT_ROW_FALLBACK] Failed to re-open channel '%s'", getChannelName()), executionCompletedEvent.getException());
        }).build(), new Fallback[0]).get(() -> {
            return this.channel.insertRow(map, null);
        });
    }

    private void ingestionFallbackSupplier(Throwable th) {
        LOGGER.warn("[INSERT_ROWS_FALLBACK] Failed to insert row with channel '{}'. Exiting with error", getChannelName());
        throw new RuntimeException(th);
    }

    private static Object invoke(Object obj, String str, Class<?>[] clsArr, Object[] objArr) {
        try {
            Method declaredMethod = obj.getClass().getDeclaredMethod(str, clsArr);
            declaredMethod.setAccessible(true);
            return declaredMethod.invoke(obj, objArr);
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible SnowflakeStreamingIngestChannel version", e);
        }
    }
}
