package com.dataartisans.flinktraining.exercises.table_java.memberotm;

import com.dataartisans.flinktraining.dataset_preparation.MBoxParser;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.TableEnvironment;

/* loaded from: input_file:com/dataartisans/flinktraining/exercises/table_java/memberotm/MemberOTMonth.class */
public class MemberOTMonth {

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/table_java/memberotm/MemberOTMonth$MonthEmailExtractor.class */
    public static class MonthEmailExtractor implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> {
        public Tuple2<String, String> map(Tuple2<String, String> tuple2) throws Exception {
            return new Tuple2<>(((String) tuple2.f0).substring(0, 7), ((String) tuple2.f1).substring(((String) tuple2.f1).lastIndexOf("<") + 1, ((String) tuple2.f1).length() - 1));
        }
    }

    public static void main(String[] strArr) throws Exception {
        String required = ParameterTool.fromArgs(strArr).getRequired("input");
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator map = executionEnvironment.readCsvFile(required).lineDelimiter(MBoxParser.MAIL_RECORD_DELIM).fieldDelimiter(MBoxParser.MAIL_FIELD_DELIM).includeFields("011").types(String.class, String.class).map(new MonthEmailExtractor());
        BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(executionEnvironment);
        Table select = tableEnvironment.fromDataSet(map, "month, sender").filter("sender !== 'jira@apache.org' && sender !== 'no-reply@apache.org' && sender !== 'git@git.apache.org'").groupBy("month, sender").select("month, sender, month.count as cnt");
        tableEnvironment.toDataSet(select.groupBy("month").select("month as m, cnt.max as max").join(select).where("month = m && cnt = max").select("month, sender"), Row.class).print();
    }
}
