package com.github.rexsheng.springboot.faster.flink.cdc.application;

import com.github.rexsheng.springboot.faster.flink.cdc.application.dto.MySqlDataSource;
import com.github.rexsheng.springboot.faster.flink.cdc.configuration.CdcProperties;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import io.debezium.data.Envelope;
import jakarta.annotation.PreDestroy;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;

/* loaded from: input_file:com/github/rexsheng/springboot/faster/flink/cdc/application/CdcRunner.class */
public class CdcRunner implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(CdcRunner.class);
    private final CdcProperties cdcProperties;
    private final Map<String, Map<String, CdcHandler>> cdcHandlers;
    private final String[] mappingTables;
    private final BinlogOffset binlogOffset;
    private StreamExecutionEnvironment env;

    public CdcRunner(CdcProperties cdcProperties, Map<String, Map<String, CdcHandler>> map, String[] strArr, BinlogOffset binlogOffset) {
        this.cdcProperties = cdcProperties;
        this.cdcHandlers = map;
        this.mappingTables = strArr;
        this.binlogOffset = binlogOffset;
    }

    @PreDestroy
    public void preDestroy() {
        if (this.env != null) {
            try {
                logger.info("CdcRunner Shutting down");
                this.env.close();
                logger.info("CdcRunner Shutdown complete");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private StartupOptions getStartupOptions() {
        if (CdcProperties.StartupOptions.initial.equals(this.cdcProperties.getStartup())) {
            return StartupOptions.initial();
        }
        if (CdcProperties.StartupOptions.latest.equals(this.cdcProperties.getStartup())) {
            return StartupOptions.latest();
        }
        if (CdcProperties.StartupOptions.earliest.equals(this.cdcProperties.getStartup())) {
            return StartupOptions.earliest();
        }
        if (CdcProperties.StartupOptions.offset.equals(this.cdcProperties.getStartup())) {
            return StartupOptions.specificOffset(this.binlogOffset);
        }
        throw new RuntimeException("Unknown startup options " + this.cdcProperties.getStartup());
    }

    public void run(ApplicationArguments applicationArguments) throws Exception {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        ConfigOption configOption = RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL;
        this.env.enableCheckpointing(this.cdcProperties.getSyncInterval(), CheckpointingMode.EXACTLY_ONCE);
        Properties properties = new Properties();
        properties.setProperty("autoReconnect", "true");
        properties.setProperty("serverTimezone", "Asia/Shanghai");
        properties.setProperty("connectionTimeZone", "Asia/Shanghai");
        properties.setProperty("characterEncoding", "UTF-8");
        Properties properties2 = new Properties();
        properties2.setProperty("database.connectionTimeZone", "Asia/Shanghai");
        DataStreamSource fromSource = this.env.fromSource(MySqlSource.builder().serverTimeZone("Asia/Shanghai").hostname(this.cdcProperties.getHostName()).port(this.cdcProperties.getPort()).databaseList(new String[]{this.cdcProperties.getDatabase()}).tableList(this.mappingTables).username(this.cdcProperties.getUsername()).password(this.cdcProperties.getPassword()).jdbcProperties(properties).debeziumProperties(properties2).serverId(this.cdcProperties.getServerId()).includeSchemaChanges(false).startupOptions(getStartupOptions()).deserializer(new MySqlDeserializationSchema()).build(), WatermarkStrategy.noWatermarks(), "MySQL Source");
        fromSource.setParallelism(this.cdcProperties.getMapParallelism().intValue());
        SingleOutputStreamOperator filter = fromSource.filter(new FilterFunction<MySqlDataSource>() { // from class: com.github.rexsheng.springboot.faster.flink.cdc.application.CdcRunner.1
            private final List<String> eventTypeArr = Arrays.asList(Envelope.Operation.CREATE.code(), Envelope.Operation.UPDATE.code(), Envelope.Operation.DELETE.code(), Envelope.Operation.TRUNCATE.code(), Envelope.Operation.READ.code());

            public boolean filter(MySqlDataSource mySqlDataSource) throws Exception {
                return mySqlDataSource.getOperation() != null && this.eventTypeArr.contains(mySqlDataSource.getOperation());
            }
        });
        final HashMap hashMap = new HashMap();
        for (String str : this.cdcHandlers.keySet()) {
            hashMap.put(str, new OutputTag(str, TypeInformation.of(MySqlDataSource.class)));
        }
        SingleOutputStreamOperator process = filter.process(new ProcessFunction<MySqlDataSource, MySqlDataSource>() { // from class: com.github.rexsheng.springboot.faster.flink.cdc.application.CdcRunner.2
            public void processElement(MySqlDataSource mySqlDataSource, ProcessFunction<MySqlDataSource, MySqlDataSource>.Context context, Collector<MySqlDataSource> collector) throws Exception {
                String obj = mySqlDataSource.getSource().get("table").toString();
                if (hashMap.containsKey(obj)) {
                    context.output((OutputTag) hashMap.get(obj), mySqlDataSource);
                    return;
                }
                Boolean bool = false;
                Iterator it = hashMap.keySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    String str2 = (String) it.next();
                    if (Arrays.asList(str2.split(",")).contains(obj)) {
                        bool = true;
                        context.output((OutputTag) hashMap.get(str2), mySqlDataSource);
                        break;
                    } else if (Pattern.compile(str2).matcher(obj).matches()) {
                        bool = true;
                        context.output((OutputTag) hashMap.get(str2), mySqlDataSource);
                        break;
                    }
                }
                if (bool.booleanValue()) {
                    return;
                }
                collector.collect(mySqlDataSource);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((MySqlDataSource) obj, (ProcessFunction<MySqlDataSource, MySqlDataSource>.Context) context, (Collector<MySqlDataSource>) collector);
            }
        });
        for (Map.Entry entry : hashMap.entrySet()) {
            SideOutputDataStream sideOutput = process.getSideOutput((OutputTag) entry.getValue());
            final CdcHandler next = this.cdcHandlers.get(entry.getKey()).values().iterator().next();
            sideOutput.addSink(new SinkFunction<MySqlDataSource>() { // from class: com.github.rexsheng.springboot.faster.flink.cdc.application.CdcRunner.3
                public void invoke(MySqlDataSource mySqlDataSource, SinkFunction.Context context) throws Exception {
                    next.handle(mySqlDataSource);
                }
            }).name(((OutputTag) entry.getValue()).getId()).setParallelism(this.cdcProperties.getReduceParallelism().intValue());
        }
        this.env.execute("Print MySQL Snapshot + Binlog");
    }
}
