package com.dataartisans.flinktraining.exercises.datastream_java.windows;

import com.dataartisans.flinktraining.exercises.datastream_java.basics.RideCleansing;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.GeoUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlaces.class */
public class PopularPlaces {

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlaces$GridCellMatcher.class */
    public static class GridCellMatcher implements MapFunction<TaxiRide, Tuple2<Integer, Boolean>> {
        public Tuple2<Integer, Boolean> map(TaxiRide taxiRide) throws Exception {
            return taxiRide.isStart ? new Tuple2<>(Integer.valueOf(GeoUtils.mapToGridCell(taxiRide.startLon, taxiRide.startLat)), true) : new Tuple2<>(Integer.valueOf(GeoUtils.mapToGridCell(taxiRide.endLon, taxiRide.endLat)), false);
        }
    }

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlaces$GridToCoordinates.class */
    public static class GridToCoordinates implements MapFunction<Tuple4<Integer, Long, Boolean, Integer>, Tuple5<Float, Float, Long, Boolean, Integer>> {
        public Tuple5<Float, Float, Long, Boolean, Integer> map(Tuple4<Integer, Long, Boolean, Integer> tuple4) throws Exception {
            return new Tuple5<>(Float.valueOf(GeoUtils.getGridCellCenterLon(((Integer) tuple4.f0).intValue())), Float.valueOf(GeoUtils.getGridCellCenterLat(((Integer) tuple4.f0).intValue())), tuple4.f1, tuple4.f2, tuple4.f3);
        }
    }

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlaces$RideCounter.class */
    public static class RideCounter implements WindowFunction<Tuple2<Integer, Boolean>, Tuple4<Integer, Long, Boolean, Integer>, Tuple, TimeWindow> {
        public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<Integer, Boolean>> iterable, Collector<Tuple4<Integer, Long, Boolean, Integer>> collector) throws Exception {
            int intValue = ((Integer) ((Tuple2) tuple).f0).intValue();
            boolean booleanValue = ((Boolean) ((Tuple2) tuple).f1).booleanValue();
            long end = timeWindow.getEnd();
            int i = 0;
            for (Tuple2<Integer, Boolean> tuple2 : iterable) {
                i++;
            }
            collector.collect(new Tuple4(Integer.valueOf(intValue), Long.valueOf(end), Boolean.valueOf(booleanValue), Integer.valueOf(i)));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Tuple) obj, (TimeWindow) window, (Iterable<Tuple2<Integer, Boolean>>) iterable, (Collector<Tuple4<Integer, Long, Boolean, Integer>>) collector);
        }
    }

    public static void main(String[] strArr) throws Exception {
        String required = ParameterTool.fromArgs(strArr).getRequired("input");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.addSource(new TaxiRideSource(required, 60, 600)).filter(new RideCleansing.NYCFilter()).map(new GridCellMatcher()).keyBy(new int[]{0, 1}).timeWindow(Time.minutes(15L), Time.minutes(5L)).apply(new RideCounter()).filter(new FilterFunction<Tuple4<Integer, Long, Boolean, Integer>>() { // from class: com.dataartisans.flinktraining.exercises.datastream_java.windows.PopularPlaces.1
            public boolean filter(Tuple4<Integer, Long, Boolean, Integer> tuple4) throws Exception {
                return ((Integer) tuple4.f3).intValue() >= 20;
            }
        }).map(new GridToCoordinates()).print();
        executionEnvironment.execute("Popular Places");
    }
}
