package com.dataartisans.flinktraining.exercises.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.CheckpointedTaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.GeoUtils;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.TravelTimePredictionModel;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction.class */
public class TravelTimePrediction {

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction$GridCellMatcher.class */
    public static class GridCellMatcher implements MapFunction<TaxiRide, Tuple2<Integer, TaxiRide>> {
        public Tuple2<Integer, TaxiRide> map(TaxiRide taxiRide) throws Exception {
            return new Tuple2<>(Integer.valueOf(GeoUtils.mapToGridCell(taxiRide.endLon, taxiRide.endLat)), taxiRide);
        }
    }

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction$NYCFilter.class */
    public static class NYCFilter implements FilterFunction<TaxiRide> {
        public boolean filter(TaxiRide taxiRide) throws Exception {
            return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
        }
    }

    /* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction$PredictionModel.class */
    public static class PredictionModel extends RichFlatMapFunction<Tuple2<Integer, TaxiRide>, Tuple2<Long, Integer>> {
        private transient ValueState<TravelTimePredictionModel> modelState;

        public void flatMap(Tuple2<Integer, TaxiRide> tuple2, Collector<Tuple2<Long, Integer>> collector) throws Exception {
            TravelTimePredictionModel travelTimePredictionModel = (TravelTimePredictionModel) this.modelState.value();
            TaxiRide taxiRide = (TaxiRide) tuple2.f1;
            double euclideanDistance = GeoUtils.getEuclideanDistance(taxiRide.startLon, taxiRide.startLat, taxiRide.endLon, taxiRide.endLat);
            int directionAngle = GeoUtils.getDirectionAngle(taxiRide.endLon, taxiRide.endLat, taxiRide.startLon, taxiRide.startLat);
            if (taxiRide.isStart) {
                collector.collect(new Tuple2(Long.valueOf(taxiRide.rideId), Integer.valueOf(travelTimePredictionModel.predictTravelTime(directionAngle, euclideanDistance))));
            } else {
                travelTimePredictionModel.refineModel(directionAngle, euclideanDistance, (taxiRide.endTime.getMillis() - taxiRide.startTime.getMillis()) / 60000.0d);
                this.modelState.update(travelTimePredictionModel);
            }
        }

        public void open(Configuration configuration) {
            this.modelState = getRuntimeContext().getState(new ValueStateDescriptor("regressionModel", TypeInformation.of(new TypeHint<TravelTimePredictionModel>() { // from class: com.dataartisans.flinktraining.exercises.datastream_java.state.TravelTimePrediction.PredictionModel.1
            }), new TravelTimePredictionModel()));
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Integer, TaxiRide>) obj, (Collector<Tuple2<Long, Integer>>) collector);
        }
    }

    public static void main(String[] strArr) throws Exception {
        String required = ParameterTool.fromArgs(strArr).getRequired("input");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10L, TimeUnit.SECONDS)));
        executionEnvironment.addSource(new CheckpointedTaxiRideSource(required, 600)).filter(new NYCFilter()).map(new GridCellMatcher()).keyBy(new int[]{0}).flatMap(new PredictionModel()).print();
        executionEnvironment.execute("Taxi Ride Prediction");
    }
}
