package io.deltastream.flink.connector.snowflake.sink.internal;

import io.deltastream.flink.connector.snowflake.sink.config.SnowflakeChannelConfig;
import io.deltastream.flink.connector.snowflake.sink.config.SnowflakeWriterConfig;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/internal/SnowflakeSinkServiceImplTest.class */
class SnowflakeSinkServiceImplTest {

    /* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/internal/SnowflakeSinkServiceImplTest$FakeSinkWriterMetricGroup.class */
    private static class FakeSinkWriterMetricGroup implements SinkWriterMetricGroup {
        private FakeSinkWriterMetricGroup() {
        }

        public Counter getNumRecordsOutErrorsCounter() {
            throw new UnsupportedOperationException();
        }

        public Counter getNumRecordsSendErrorsCounter() {
            return new SimpleCounter();
        }

        public Counter getNumRecordsSendCounter() {
            return new SimpleCounter();
        }

        public Counter getNumBytesSendCounter() {
            throw new UnsupportedOperationException();
        }

        public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
            throw new UnsupportedOperationException();
        }

        public OperatorIOMetricGroup getIOMetricGroup() {
            throw new UnsupportedOperationException();
        }

        public Counter counter(String str) {
            throw new UnsupportedOperationException();
        }

        public <C extends Counter> C counter(String str, C c) {
            throw new UnsupportedOperationException();
        }

        public <T, G extends Gauge<T>> G gauge(String str, G g) {
            throw new UnsupportedOperationException();
        }

        public <H extends Histogram> H histogram(String str, H h) {
            throw new UnsupportedOperationException();
        }

        public <M extends Meter> M meter(String str, M m) {
            throw new UnsupportedOperationException();
        }

        public MetricGroup addGroup(String str) {
            throw new UnsupportedOperationException();
        }

        public MetricGroup addGroup(String str, String str2) {
            throw new UnsupportedOperationException();
        }

        public String[] getScopeComponents() {
            throw new UnsupportedOperationException();
        }

        public Map<String, String> getAllVariables() {
            throw new UnsupportedOperationException();
        }

        public String getMetricIdentifier(String str) {
            throw new UnsupportedOperationException();
        }

        public String getMetricIdentifier(String str, CharacterFilter characterFilter) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:io/deltastream/flink/connector/snowflake/sink/internal/SnowflakeSinkServiceImplTest$FakeSnowflakeSinkServiceImpl.class */
    private static class FakeSnowflakeSinkServiceImpl extends SnowflakeSinkServiceImpl {
        public FakeSnowflakeSinkServiceImpl(String str, int i, Properties properties, SnowflakeWriterConfig snowflakeWriterConfig, SnowflakeChannelConfig snowflakeChannelConfig, SinkWriterMetricGroup sinkWriterMetricGroup) {
            super(str, i, properties, snowflakeWriterConfig, snowflakeChannelConfig, sinkWriterMetricGroup);
        }

        SnowflakeStreamingIngestClient createClientFromConfig(String str, Properties properties) {
            return new FakeSnowflakeStreamingIngestClient(getChannelName());
        }
    }

    SnowflakeSinkServiceImplTest() {
    }

    @Test
    void testSuccessfulInsert() throws Exception {
        FakeSnowflakeSinkServiceImpl fakeSnowflakeSinkServiceImpl = new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup());
        try {
            Assertions.assertEquals(0L, fakeSnowflakeSinkServiceImpl.getLatestCommittedOffsetFromSnowflakeIngestChannel());
            fakeSnowflakeSinkServiceImpl.insert(Map.of("field_1", "val_1"));
            Assertions.assertEquals(1L, fakeSnowflakeSinkServiceImpl.getLatestCommittedOffsetFromSnowflakeIngestChannel());
            fakeSnowflakeSinkServiceImpl.close();
        } catch (Throwable th) {
            try {
                fakeSnowflakeSinkServiceImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testInsertExceptionHandling() throws Exception {
        FakeSnowflakeSinkServiceImpl fakeSnowflakeSinkServiceImpl = new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup()) { // from class: io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImplTest.1
            public SnowflakeStreamingIngestChannel getChannel() {
                return new FakeSnowflakeStreamingIngestChannel(getChannelName(), getChannelConfig().getDatabaseName(), getChannelConfig().getSchemaName(), getChannelConfig().getTableName()) { // from class: io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImplTest.1.1
                    @Override // net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestChannel
                    public InsertValidationResponse insertRow(Map<String, Object> map, String str) {
                        throw new SFException(ErrorCode.INTERNAL_ERROR, new Object[]{"test"});
                    }
                };
            }
        };
        try {
            Assertions.assertTrue(((IOException) Assertions.assertThrows(IOException.class, () -> {
                fakeSnowflakeSinkServiceImpl.insert(Map.of("field_1", "val_1"));
            })).getMessage().contains("Failed to insert row with Snowflake sink service"));
            fakeSnowflakeSinkServiceImpl.close();
        } catch (Throwable th) {
            try {
                fakeSnowflakeSinkServiceImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testInsertErrornHandling() throws Exception {
        FakeSnowflakeSinkServiceImpl fakeSnowflakeSinkServiceImpl = new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup()) { // from class: io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImplTest.2
            public SnowflakeStreamingIngestChannel getChannel() {
                return new FakeSnowflakeStreamingIngestChannel(getChannelName(), getChannelConfig().getDatabaseName(), getChannelConfig().getSchemaName(), getChannelConfig().getTableName()) { // from class: io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImplTest.2.1
                    @Override // net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestChannel
                    public InsertValidationResponse insertRow(Map<String, Object> map, String str) {
                        InsertValidationResponse insertValidationResponse = new InsertValidationResponse();
                        InsertValidationResponse.InsertError insertError = new InsertValidationResponse.InsertError(map, Long.parseLong(str));
                        insertError.setException(new SFException(ErrorCode.INTERNAL_ERROR, new Object[]{"test"}));
                        insertValidationResponse.addError(insertError);
                        return insertValidationResponse;
                    }
                };
            }
        };
        try {
            Assertions.assertTrue(((IOException) Assertions.assertThrows(IOException.class, () -> {
                fakeSnowflakeSinkServiceImpl.insert(Map.of("field_1", "val_1"));
            })).getMessage().contains("Encountered errors while ingesting rows into Snowflake: Ingest client internal error: test."));
            fakeSnowflakeSinkServiceImpl.close();
        } catch (Throwable th) {
            try {
                fakeSnowflakeSinkServiceImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testFetchOffsetTokenErrorHandling() {
        FlinkRuntimeException assertThrows = Assertions.assertThrows(FlinkRuntimeException.class, () -> {
            new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup()) { // from class: io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImplTest.3
                public SnowflakeStreamingIngestClient getClient() {
                    return new FakeSnowflakeStreamingIngestClient(getChannelName()) { // from class: io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImplTest.3.1
                        @Override // net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestClient
                        public Map<String, String> getLatestCommittedOffsetTokens(List<SnowflakeStreamingIngestChannel> list) {
                            HashMap hashMap = new HashMap();
                            list.forEach(snowflakeStreamingIngestChannel -> {
                                hashMap.put(snowflakeStreamingIngestChannel.getFullyQualifiedName(), "invalid_token");
                            });
                            return hashMap;
                        }
                    };
                }
            };
        });
        Assertions.assertTrue(assertThrows.getMessage().contains(String.format("The offsetToken '%s' cannot be parsed as a long for channel", "invalid_token")));
        Assertions.assertTrue(assertThrows.getCause() instanceof NumberFormatException);
    }
}
