package io.tidb.bigdata.flink.connector.source;

import io.tidb.bigdata.flink.connector.source.CDCSourceBuilder;
import io.tidb.bigdata.flink.format.cdc.CDCDeserializationSchemaBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.apache.flink.table.data.RowData;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/tidb/bigdata/flink/connector/source/KafkaCDCSourceBuilder.class */
public class KafkaCDCSourceBuilder extends CDCSourceBuilder<KafkaPartitionSplit, KafkaSourceEnumState> {
    private final KafkaSourceBuilder<RowData> builder;
    private static final String OPTION_PREFIX = "tidb.streaming.kafka.";
    private static final int OPTION_PREFIX_LENGTH = OPTION_PREFIX.length();

    public KafkaCDCSourceBuilder(CDCDeserializationSchemaBuilder cDCDeserializationSchemaBuilder) {
        super(cDCDeserializationSchemaBuilder);
        this.builder = KafkaSource.builder();
    }

    @Override // io.tidb.bigdata.flink.connector.source.CDCSourceBuilder
    public CDCSourceBuilder.Type type() {
        return CDCSourceBuilder.Type.KAFKA;
    }

    @Override // io.tidb.bigdata.flink.connector.source.CDCSourceBuilder
    protected CDCSource<KafkaPartitionSplit, KafkaSourceEnumState> doBuild(DeserializationSchema<RowData> deserializationSchema) {
        setDeserializer(deserializationSchema);
        return new CDCSource<>(this.builder.build());
    }

    @Override // io.tidb.bigdata.flink.connector.source.CDCSourceBuilder
    protected CDCSource<KafkaPartitionSplit, KafkaSourceEnumState> doBuild(KafkaDeserializationSchema<RowData> kafkaDeserializationSchema) {
        setDeserializer(kafkaDeserializationSchema);
        return new CDCSource<>(this.builder.build());
    }

    public KafkaCDCSourceBuilder setBootstrapServers(String str) {
        return setProperty("bootstrap.servers", str);
    }

    public KafkaCDCSourceBuilder setGroupId(String str) {
        return setProperty("group.id", str);
    }

    public KafkaCDCSourceBuilder setTopics(List<String> list) {
        this.builder.setTopics(list);
        return this;
    }

    public KafkaCDCSourceBuilder setTopics(String... strArr) {
        return setTopics(Arrays.asList(strArr));
    }

    public KafkaCDCSourceBuilder setTopicPattern(Pattern pattern) {
        this.builder.setTopicPattern(pattern);
        return this;
    }

    public KafkaCDCSourceBuilder setPartitions(Set<TopicPartition> set) {
        this.builder.setPartitions(set);
        return this;
    }

    public KafkaCDCSourceBuilder setStartingOffsets(OffsetsInitializer offsetsInitializer) {
        this.builder.setStartingOffsets(offsetsInitializer);
        return this;
    }

    public KafkaCDCSourceBuilder setDeserializer(DeserializationSchema<RowData> deserializationSchema) {
        this.builder.setValueOnlyDeserializer(deserializationSchema);
        return this;
    }

    public KafkaCDCSourceBuilder setDeserializer(KafkaDeserializationSchema<RowData> kafkaDeserializationSchema) {
        this.builder.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializationSchema));
        return this;
    }

    public KafkaCDCSourceBuilder setClientIdPrefix(String str) {
        return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), str);
    }

    public KafkaCDCSourceBuilder setProperty(String str, String str2) {
        this.builder.setProperty(str, str2);
        return this;
    }

    private void handleOptions(String str, String str2) {
        if (str.startsWith(OPTION_PREFIX)) {
            str = str.substring(OPTION_PREFIX_LENGTH);
        }
        if (KafkaOptions.PROPS_BOOTSTRAP_SERVERS.key().equals(str)) {
            setBootstrapServers(str2);
            return;
        }
        if (KafkaOptions.PROPS_GROUP_ID.key().equals(str)) {
            setGroupId(str2);
        } else if (KafkaOptions.TOPIC.key().equals(str)) {
            setTopics(str2);
        } else {
            setProperty(str, str2);
        }
    }

    public KafkaCDCSourceBuilder setProperties(Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            handleOptions(entry.getKey(), entry.getValue());
        }
        return this;
    }

    public KafkaCDCSourceBuilder setProperties(Properties properties) {
        for (Map.Entry entry : properties.entrySet()) {
            handleOptions(entry.getKey().toString(), entry.getValue().toString());
        }
        return this;
    }
}
