package io.tidb.bigdata.flink.tidb;

import io.tidb.bigdata.jdbc.TiDBDriver;
import io.tidb.bigdata.tidb.ClientConfig;
import io.tidb.bigdata.tidb.ClientSession;
import io.tidb.bigdata.tidb.TiDBWriteMode;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;
import shade.bigdata.com.google.common.annotations.VisibleForTesting;
import shade.bigdata.com.google.common.collect.ImmutableSet;

/* loaded from: input_file:io/tidb/bigdata/flink/tidb/TiDBBaseDynamicTableFactory.class */
public abstract class TiDBBaseDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String IDENTIFIER = "tidb";
    private static final ConfigOption<String> DATABASE_URL = ConfigOptions.key(ClientConfig.DATABASE_URL).stringType().noDefaultValue();
    private static final ConfigOption<String> USERNAME = ConfigOptions.key(ClientConfig.USERNAME).stringType().noDefaultValue();
    private static final ConfigOption<String> PASSWORD = ConfigOptions.key(ClientConfig.PASSWORD).stringType().noDefaultValue();
    public static final ConfigOption<String> DATABASE_NAME = ConfigOptions.key("tidb.database.name").stringType().noDefaultValue();
    public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("tidb.table.name").stringType().noDefaultValue();
    private static final ConfigOption<String> MAX_POOL_SIZE = ConfigOptions.key(ClientConfig.MAX_POOL_SIZE).stringType().noDefaultValue();
    private static final ConfigOption<String> MIN_IDLE_SIZE = ConfigOptions.key(ClientConfig.MIN_IDLE_SIZE).stringType().noDefaultValue();
    private static final ConfigOption<String> WRITE_MODE = ConfigOptions.key(ClientConfig.TIDB_WRITE_MODE).stringType().defaultValue("append");
    private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows").intType().defaultValue(100).withDescription("the flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100.");
    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1)).withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");
    private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("the max retry times if writing records to database failed.");

    public String factoryIdentifier() {
        return "tidb";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return ImmutableSet.of(DATABASE_URL, USERNAME, DATABASE_NAME, TABLE_NAME);
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return ImmutableSet.of((ConfigOption<Integer>) PASSWORD, (ConfigOption<Integer>) MAX_POOL_SIZE, (ConfigOption<Integer>) MIN_IDLE_SIZE, (ConfigOption<Integer>) SINK_BUFFER_FLUSH_INTERVAL, SINK_BUFFER_FLUSH_MAX_ROWS, SINK_MAX_RETRIES, (ConfigOption<Integer>[]) new ConfigOption[]{WRITE_MODE});
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        ReadableConfig options = FactoryUtil.createTableFactoryHelper(this, context).getOptions();
        TableSchema schema = context.getCatalogTable().getSchema();
        String str = (String) options.get(DATABASE_URL);
        String str2 = (String) options.get(DATABASE_NAME);
        String str3 = (String) options.get(TABLE_NAME);
        Preconditions.checkArgument(str.matches("jdbc:(mysql|tidb)://[^/]+:\\d+/.*"), "the format of database url does not match jdbc:(mysql|tidb)://host:port/.*");
        String rewriteJdbcUrlPath = rewriteJdbcUrlPath(str, str2);
        JdbcOptions build = JdbcOptions.builder().setDBUrl(rewriteJdbcUrlPath).setTableName(str3).setUsername((String) options.get(USERNAME)).setPassword((String) options.get(PASSWORD)).setDialect(new MySQLDialect()).setDriverName(rewriteJdbcUrlPath.startsWith(TiDBDriver.TIDB_PREFIX) ? ClientConfig.TIDB_DRIVER_NAME : "com.mysql.jdbc.Driver").build();
        return new JdbcDynamicTableSink(build, JdbcExecutionOptions.builder().withBatchSize(((Integer) options.get(SINK_BUFFER_FLUSH_MAX_ROWS)).intValue()).withBatchIntervalMs(((Duration) options.get(SINK_BUFFER_FLUSH_INTERVAL)).toMillis()).withMaxRetries(((Integer) options.get(SINK_MAX_RETRIES)).intValue()).build(), JdbcDmlOptions.builder().withTableName(build.getTableName()).withDialect(build.getDialect()).withFieldNames(schema.getFieldNames()).withKeyFields(getKeyFields(context, options, str2, str3)).build(), schema);
    }

    private String[] getKeyFields(DynamicTableFactory.Context context, ReadableConfig readableConfig, String str, String str2) {
        String[] strArr = null;
        if (TiDBWriteMode.fromString((String) readableConfig.get(WRITE_MODE)) == TiDBWriteMode.UPSERT) {
            try {
                ClientSession createWithSingleConnection = ClientSession.createWithSingleConnection(new ClientConfig((Map<String, String>) context.getCatalogTable().toProperties()));
                Throwable th = null;
                try {
                    ImmutableSet build = ImmutableSet.builder().addAll((Iterable) createWithSingleConnection.getUniqueKeyColumns(str, str2)).addAll((Iterable) createWithSingleConnection.getPrimaryKeyColumns(str, str2)).build();
                    strArr = build.size() == 0 ? null : (String[]) build.toArray(new String[0]);
                    if (createWithSingleConnection != null) {
                        if (0 != 0) {
                            try {
                                createWithSingleConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createWithSingleConnection.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        return strArr;
    }

    @VisibleForTesting
    protected String rewriteJdbcUrlPath(String str, String str2) {
        try {
            URI uri = new URI(str.substring("jdbc:".length()));
            String scheme = uri.getScheme();
            String host = uri.getHost();
            int port = uri.getPort();
            return str.replace(String.format("jdbc:%s://%s:%d%s", scheme, host, Integer.valueOf(port), uri.getPath()), String.format("jdbc:%s://%s:%d/%s", scheme, host, Integer.valueOf(port), str2));
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException(e);
        }
    }
}
