package com.dataartisans.flinktraining.exercises.datastream_java.sources;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

/* loaded from: input_file:com/dataartisans/flinktraining/exercises/datastream_java/sources/CheckpointedTaxiRideSource.class */
public class CheckpointedTaxiRideSource implements SourceFunction<TaxiRide>, Checkpointed<Long> {
    private final String dataFilePath;
    private final int servingSpeed;
    private transient BufferedReader reader;
    private transient InputStream gzipStream;
    private long eventCnt;

    public CheckpointedTaxiRideSource(String str) {
        this(str, 1);
    }

    public CheckpointedTaxiRideSource(String str, int i) {
        this.eventCnt = 0L;
        this.dataFilePath = str;
        this.servingSpeed = i;
    }

    public void run(SourceFunction.SourceContext<TaxiRide> sourceContext) throws Exception {
        String readLine;
        String readLine2;
        Object checkpointLock = sourceContext.getCheckpointLock();
        this.gzipStream = new GZIPInputStream(new FileInputStream(this.dataFilePath));
        this.reader = new BufferedReader(new InputStreamReader(this.gzipStream, "UTF-8"));
        long j = 0;
        long j2 = 0;
        while (this.reader.ready() && (readLine2 = this.reader.readLine()) != null && j2 <= this.eventCnt) {
            j2++;
            j = getEventTime(TaxiRide.fromString(readLine2));
        }
        while (this.reader.ready() && (readLine = this.reader.readLine()) != null) {
            TaxiRide fromString = TaxiRide.fromString(readLine);
            long eventTime = getEventTime(fromString);
            Thread.sleep((eventTime - j) / this.servingSpeed);
            synchronized (checkpointLock) {
                this.eventCnt++;
                sourceContext.collectWithTimestamp(fromString, eventTime);
                sourceContext.emitWatermark(new Watermark(eventTime - 1));
            }
            j = eventTime;
        }
        this.reader.close();
        this.reader = null;
        this.gzipStream.close();
        this.gzipStream = null;
    }

    public long getEventTime(TaxiRide taxiRide) {
        return taxiRide.isStart ? taxiRide.startTime.getMillis() : taxiRide.endTime.getMillis();
    }

    public void cancel() {
        try {
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                if (this.gzipStream != null) {
                    this.gzipStream.close();
                }
            } catch (IOException e) {
                throw new RuntimeException("Could not cancel SourceFunction", e);
            }
        } finally {
            this.reader = null;
            this.gzipStream = null;
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public Long m17snapshotState(long j, long j2) throws Exception {
        return Long.valueOf(this.eventCnt);
    }

    public void restoreState(Long l) throws Exception {
        this.eventCnt = l.longValue();
    }
}
