package io.tidb.bigdata.flink.tidb;

import io.tidb.bigdata.cdc.Key;
import io.tidb.bigdata.flink.format.cdc.CraftFormatFactory;
import io.tidb.bigdata.flink.format.cdc.FormatOptions;
import io.tidb.bigdata.tidb.ClientConfig;
import io.tidb.bigdata.tidb.ClientSession;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/tidb/bigdata/flink/tidb/TiDBDynamicTableFactory.class */
public class TiDBDynamicTableFactory extends TiDBBaseDynamicTableFactory {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) TiDBDynamicTableFactory.class);
    public static final ConfigOption<String> STREAMING_SOURCE = ConfigOptions.key("tidb.streaming.source").stringType().noDefaultValue();
    private static final String STREAMING_SOURCE_KAFKA = "kafka";
    private static final String STREAMING_SOURCE_PULSAR = "pulsar";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/tidb/bigdata/flink/tidb/TiDBDynamicTableFactory$StreamingSourceContext.class */
    public static class StreamingSourceContext {
        final String source;
        final DynamicTableFactory.Context context;
        final long version;
        final Function<Map<String, String>, Map<String, String>> populateDefault;

        private StreamingSourceContext(String str, DynamicTableFactory.Context context, long j, Function<Map<String, String>, Map<String, String>> function) {
            this.source = str;
            this.context = context;
            this.version = j;
            this.populateDefault = function;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CatalogTable getCatalogTable() {
            return this.context.getCatalogTable();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getSourceType() {
            return this.source;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getVersion() {
            return this.version;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ObjectIdentifier getObjectIdentifier() {
            return this.context.getObjectIdentifier();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ReadableConfig getConfiguration() {
            return this.context.getConfiguration();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ClassLoader getClassLoader() {
            return this.context.getClassLoader();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isTemporary() {
            return this.context.isTemporary();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<String, String> populateDefault(Map<String, String> map) {
            return this.populateDefault.apply(map);
        }
    }

    private static long getSnapshotTimestamp(Map<String, String> map) {
        ClientSession clientSession = null;
        try {
            clientSession = ClientSession.createWithSingleConnection(new ClientConfig(map));
            long version = clientSession.getSnapshotVersion().getVersion();
            if (clientSession != null) {
                Objects.requireNonNull(clientSession);
                ExceptionUtils.suppressExceptions(clientSession::close);
            }
            return version;
        } catch (Throwable th) {
            if (clientSession != null) {
                ClientSession clientSession2 = clientSession;
                Objects.requireNonNull(clientSession2);
                ExceptionUtils.suppressExceptions(clientSession2::close);
            }
            throw th;
        }
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        ReadableConfig options = FactoryUtil.createTableFactoryHelper(this, context).getOptions();
        Map properties = context.getCatalogTable().toProperties();
        TableSchema schema = context.getCatalogTable().getSchema();
        JdbcLookupOptions lookupOptions = getLookupOptions(context);
        return (DynamicTableSource) options.getOptional(STREAMING_SOURCE).map(str -> {
            return createStreamingTableSource(context, str, schema, properties, lookupOptions);
        }).orElseGet(() -> {
            return new TiDBBatchDynamicTableSource(schema, properties, lookupOptions);
        });
    }

    @Override // io.tidb.bigdata.flink.tidb.TiDBBaseDynamicTableFactory
    public Set<ConfigOption<?>> optionalOptions() {
        return withMoreOptionalOptions(STREAMING_SOURCE);
    }

    private static String randomID() {
        return UUID.randomUUID().toString();
    }

    private static Map<String, String> kafkaDefaultParameters(Map<String, String> map) {
        map.put("properties.group.id", randomID());
        return map;
    }

    private static Map<String, String> pulsarDefaultParameters(Map<String, String> map) {
        map.put("scan.startup.mode", "external-subscription");
        map.put("scan.startup.sub-name", randomID());
        return map;
    }

    private DynamicTableSource createStreamingTableSource(DynamicTableFactory.Context context, String str, TableSchema tableSchema, Map<String, String> map, JdbcLookupOptions jdbcLookupOptions) {
        Function function;
        boolean z = -1;
        switch (str.hashCode()) {
            case -977119363:
                if (str.equals(STREAMING_SOURCE_PULSAR)) {
                    z = true;
                    break;
                }
                break;
            case 101807910:
                if (str.equals(STREAMING_SOURCE_KAFKA)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                function = TiDBDynamicTableFactory::kafkaDefaultParameters;
                break;
            case true:
                function = TiDBDynamicTableFactory::pulsarDefaultParameters;
                break;
            default:
                throw new IllegalStateException("Not supported TiDB streaming source: " + str);
        }
        long snapshotTimestamp = getSnapshotTimestamp(map);
        HashMap hashMap = new HashMap(map);
        hashMap.put(ClientConfig.SNAPSHOT_VERSION, Long.toString(snapshotTimestamp));
        return new TiDBStreamingDynamicTableSource(tableSchema, hashMap, jdbcLookupOptions, createStreamingSource(new StreamingSourceContext(str, context, snapshotTimestamp, function)), snapshotTimestamp);
    }

    private static DynamicTableFactory.Context wrapContext(final StreamingSourceContext streamingSourceContext) {
        final CatalogTable catalogTable = streamingSourceContext.getCatalogTable();
        final String str = "tidb.streaming." + streamingSourceContext.getSourceType() + ".";
        return new DynamicTableFactory.Context() { // from class: io.tidb.bigdata.flink.tidb.TiDBDynamicTableFactory.1
            public ObjectIdentifier getObjectIdentifier() {
                return StreamingSourceContext.this.getObjectIdentifier();
            }

            public CatalogTable getCatalogTable() {
                return new CatalogTable() { // from class: io.tidb.bigdata.flink.tidb.TiDBDynamicTableFactory.1.1
                    public boolean isPartitioned() {
                        return catalogTable.isPartitioned();
                    }

                    public List<String> getPartitionKeys() {
                        return catalogTable.getPartitionKeys();
                    }

                    public CatalogTable copy(Map<String, String> map) {
                        return catalogTable.copy(map);
                    }

                    public CatalogBaseTable copy() {
                        return catalogTable.copy();
                    }

                    public Map<String, String> toProperties() {
                        return convertProperties(catalogTable.toProperties());
                    }

                    public Map<String, String> getProperties() {
                        return convertProperties(catalogTable.getProperties());
                    }

                    private void populateDefault(Map<String, String> map, String str2, Map<String, String> map2, String str3) {
                        if (map.containsKey(str2)) {
                            map2.put(str3, map.get(str2));
                        }
                    }

                    private Map<String, String> newDefaultProperties(Map<String, String> map) {
                        HashMap hashMap = new HashMap();
                        populateDefault(map, TiDBBaseDynamicTableFactory.DATABASE_NAME.key(), hashMap, "value.ticdc-craft." + FormatOptions.SCHEMA_INCLUDE.key());
                        populateDefault(map, TiDBBaseDynamicTableFactory.TABLE_NAME.key(), hashMap, "value.ticdc-craft." + FormatOptions.TABLE_INCLUDE.key());
                        hashMap.put("value.format", CraftFormatFactory.IDENTIFIER);
                        hashMap.put("value.ticdc-craft." + FormatOptions.TYPE_INCLUDE.key(), Key.Type.ROW_CHANGED.name());
                        hashMap.put("value.ticdc-craft." + FormatOptions.EARLIEST_VERSION.key(), Long.toString(StreamingSourceContext.this.getVersion()));
                        return StreamingSourceContext.this.populateDefault(hashMap);
                    }

                    private Map<String, String> convertProperties(Map<String, String> map) {
                        Map<String, String> newDefaultProperties = newDefaultProperties(map);
                        for (Map.Entry<String, String> entry : map.entrySet()) {
                            if (entry.getKey().startsWith(str)) {
                                newDefaultProperties.put(entry.getKey().substring(str.length()), entry.getValue());
                            }
                        }
                        TiDBDynamicTableFactory.LOG.info("Final properties for streaming source: {}", newDefaultProperties.toString());
                        return newDefaultProperties;
                    }

                    public TableSchema getSchema() {
                        return catalogTable.getSchema();
                    }

                    public String getComment() {
                        return catalogTable.getComment();
                    }

                    public Optional<String> getDescription() {
                        return catalogTable.getDescription();
                    }

                    public Optional<String> getDetailedDescription() {
                        return catalogTable.getDetailedDescription();
                    }
                };
            }

            public ReadableConfig getConfiguration() {
                return StreamingSourceContext.this.getConfiguration();
            }

            public ClassLoader getClassLoader() {
                return StreamingSourceContext.this.getClassLoader();
            }

            public boolean isTemporary() {
                return StreamingSourceContext.this.isTemporary();
            }
        };
    }

    private static ScanTableSource createStreamingSource(StreamingSourceContext streamingSourceContext) {
        return FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), DynamicTableSourceFactory.class, streamingSourceContext.source).createDynamicTableSource(wrapContext(streamingSourceContext));
    }
}
