package com.dataartisans.flinktraining.exercises.datastream_java.sources;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Calendar;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.zip.GZIPInputStream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

/* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/sources/TaxiRideSource.class */
public class TaxiRideSource implements SourceFunction<TaxiRide> {
    private final int maxDelayMsecs;
    private final int watermarkDelayMSecs;
    private final String dataFilePath;
    private final int servingSpeed;
    private transient BufferedReader reader;
    private transient InputStream gzipStream;

    public TaxiRideSource(String str) {
        this(str, 0, 1);
    }

    public TaxiRideSource(String str, int i) {
        this(str, 0, i);
    }

    public TaxiRideSource(String str, int i, int i2) {
        if (i < 0) {
            throw new IllegalArgumentException("Max event delay must be positive");
        }
        this.dataFilePath = str;
        this.maxDelayMsecs = i * 1000;
        this.watermarkDelayMSecs = this.maxDelayMsecs < 10000 ? 10000 : this.maxDelayMsecs;
        this.servingSpeed = i2;
    }

    public void run(SourceFunction.SourceContext<TaxiRide> sourceContext) throws Exception {
        this.gzipStream = new GZIPInputStream(new FileInputStream(this.dataFilePath));
        this.reader = new BufferedReader(new InputStreamReader(this.gzipStream, "UTF-8"));
        generateUnorderedStream(sourceContext);
        this.reader.close();
        this.reader = null;
        this.gzipStream.close();
        this.gzipStream = null;
    }

    private void generateUnorderedStream(SourceFunction.SourceContext<TaxiRide> sourceContext) throws Exception {
        String readLine;
        String readLine2;
        String readLine3;
        long timeInMillis = Calendar.getInstance().getTimeInMillis();
        Random random = new Random(7452L);
        PriorityQueue priorityQueue = new PriorityQueue(32, new Comparator<Tuple2<Long, Object>>() { // from class: com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.1
            @Override // java.util.Comparator
            public int compare(Tuple2<Long, Object> tuple2, Tuple2<Long, Object> tuple22) {
                return ((Long) tuple2.f0).compareTo((Long) tuple22.f0);
            }
        });
        if (!this.reader.ready() || (readLine = this.reader.readLine()) == null) {
            return;
        }
        TaxiRide fromString = TaxiRide.fromString(readLine);
        long eventTime = getEventTime(fromString);
        priorityQueue.add(new Tuple2(Long.valueOf(eventTime + getNormalDelayMsecs(random)), fromString));
        long j = eventTime + this.watermarkDelayMSecs;
        priorityQueue.add(new Tuple2(Long.valueOf(j), new Watermark((j - this.maxDelayMsecs) - 1)));
        if (this.reader.ready() && (readLine3 = this.reader.readLine()) != null) {
            fromString = TaxiRide.fromString(readLine3);
        }
        while (true) {
            if (priorityQueue.size() <= 0 && !this.reader.ready()) {
                return;
            }
            long longValue = !priorityQueue.isEmpty() ? ((Long) ((Tuple2) priorityQueue.peek()).f0).longValue() : -1L;
            long eventTime2 = fromString != null ? getEventTime(fromString) : -1L;
            while (true) {
                long j2 = eventTime2;
                if (fromString == null || (!priorityQueue.isEmpty() && j2 >= longValue + this.maxDelayMsecs)) {
                    break;
                }
                priorityQueue.add(new Tuple2(Long.valueOf(j2 + getNormalDelayMsecs(random)), fromString));
                if (!this.reader.ready() || (readLine2 = this.reader.readLine()) == null) {
                    fromString = null;
                    eventTime2 = -1;
                } else {
                    fromString = TaxiRide.fromString(readLine2);
                    eventTime2 = getEventTime(fromString);
                }
            }
            Tuple2 tuple2 = (Tuple2) priorityQueue.poll();
            long longValue2 = ((Long) tuple2.f0).longValue();
            long servingTime = toServingTime(timeInMillis, eventTime, longValue2) - Calendar.getInstance().getTimeInMillis();
            Thread.sleep(servingTime > 0 ? servingTime : 0L);
            if (tuple2.f1 instanceof TaxiRide) {
                TaxiRide taxiRide = (TaxiRide) tuple2.f1;
                sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide));
            } else if (tuple2.f1 instanceof Watermark) {
                sourceContext.emitWatermark((Watermark) tuple2.f1);
                long j3 = longValue2 + this.watermarkDelayMSecs;
                priorityQueue.add(new Tuple2(Long.valueOf(j3), new Watermark((j3 - this.maxDelayMsecs) - 1)));
            }
        }
    }

    public long toServingTime(long j, long j2, long j3) {
        return j + ((j3 - j2) / this.servingSpeed);
    }

    public long getEventTime(TaxiRide taxiRide) {
        return taxiRide.isStart ? taxiRide.startTime.getMillis() : taxiRide.endTime.getMillis();
    }

    public long getNormalDelayMsecs(Random random) {
        long j = -1;
        long j2 = this.maxDelayMsecs / 2;
        while (true) {
            if (j >= 0 && j <= this.maxDelayMsecs) {
                return j;
            }
            j = ((long) (random.nextGaussian() * j2)) + j2;
        }
    }

    public void cancel() {
        try {
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                if (this.gzipStream != null) {
                    this.gzipStream.close();
                }
            } catch (IOException e) {
                throw new RuntimeException("Could not cancel SourceFunction", e);
            }
        } finally {
            this.reader = null;
            this.gzipStream = null;
        }
    }
}
