package io.tidb.bigdata.flink.tidb;

import io.tidb.bigdata.cdc.Key;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.util.ExceptionUtils;

/* loaded from: input_file:io/tidb/bigdata/flink/tidb/TiDBStreamingSourceFunction.class */
public class TiDBStreamingSourceFunction extends RichSourceFunction<RowData> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<RowData> {
    private final TiDBRowDataInputFormat inputFormat;
    private final SourceFunction<RowData> sourceFunction;
    private final CheckpointedFunction checkpointedFunction;
    private final CheckpointListener checkpointListener;
    private final AbstractRichFunction abstractRichFunction;
    private final ResultTypeQueryable<RowData> resultTypeQueryable;
    private final AtomicBoolean runningSnapshot = new AtomicBoolean(true);
    private final StreamingReadableMetadata[] metadata;
    private final long version;

    public TiDBStreamingSourceFunction(TiDBRowDataInputFormat tiDBRowDataInputFormat, StreamingReadableMetadata[] streamingReadableMetadataArr, long j, ScanTableSource.ScanRuntimeProvider scanRuntimeProvider) {
        this.inputFormat = tiDBRowDataInputFormat;
        this.metadata = streamingReadableMetadataArr;
        this.version = j;
        this.sourceFunction = ((SourceFunctionProvider) scanRuntimeProvider).createSourceFunction();
        if (this.sourceFunction instanceof CheckpointListener) {
            this.checkpointListener = this.sourceFunction;
        } else {
            this.checkpointListener = null;
        }
        if (this.sourceFunction instanceof CheckpointedFunction) {
            this.checkpointedFunction = this.sourceFunction;
        } else {
            this.checkpointedFunction = null;
        }
        if (this.sourceFunction instanceof AbstractRichFunction) {
            this.abstractRichFunction = this.sourceFunction;
        } else {
            this.abstractRichFunction = null;
        }
        if (this.sourceFunction instanceof ResultTypeQueryable) {
            this.resultTypeQueryable = this.sourceFunction;
        } else {
            this.resultTypeQueryable = null;
        }
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        super.setRuntimeContext(runtimeContext);
        this.abstractRichFunction.setRuntimeContext(runtimeContext);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.checkpointListener == null) {
            return;
        }
        this.checkpointListener.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.checkpointListener == null) {
            return;
        }
        this.checkpointListener.notifyCheckpointAborted(j);
    }

    public TypeInformation<RowData> getProducedType() {
        if (this.resultTypeQueryable == null) {
            return null;
        }
        return this.resultTypeQueryable.getProducedType();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.checkpointedFunction == null || this.runningSnapshot.get()) {
            return;
        }
        this.checkpointedFunction.snapshotState(functionSnapshotContext);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.checkpointedFunction == null) {
            return;
        }
        this.checkpointedFunction.initializeState(functionInitializationContext);
    }

    private void runBatchSplitWithMetadata(SourceFunction.SourceContext<RowData> sourceContext, Object[] objArr) throws IOException {
        while (!this.inputFormat.reachedEnd()) {
            GenericRowData nextRecordWithFactory = this.inputFormat.nextRecordWithFactory(num -> {
                return new GenericRowData(num.intValue() + objArr.length);
            });
            int arity = nextRecordWithFactory.getArity() - objArr.length;
            for (Object obj : objArr) {
                int i = arity;
                arity++;
                nextRecordWithFactory.setField(i, obj);
            }
            sourceContext.collect(nextRecordWithFactory);
        }
    }

    private void runBatchSplit(SourceFunction.SourceContext<RowData> sourceContext) throws IOException {
        while (!this.inputFormat.reachedEnd()) {
            sourceContext.collect(this.inputFormat.nextRecord((RowData) null));
        }
    }

    private Object convertMetadata(StreamingReadableMetadata streamingReadableMetadata) {
        switch (streamingReadableMetadata) {
            case COMMIT_TIMESTAMP:
                return TimestampData.fromEpochMillis(Key.toTimestamp(this.version));
            case COMMIT_VERSION:
                return Long.valueOf(this.version);
            default:
                throw new IllegalStateException("Not supported metadata type:" + streamingReadableMetadata);
        }
    }

    private void runBatch(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        try {
            Object[] array = this.metadata != null ? Arrays.stream(this.metadata).map(this::convertMetadata).toArray(i -> {
                return new Object[i];
            }) : null;
            this.inputFormat.openInputFormat();
            for (InputSplit inputSplit : this.inputFormat.createInputSplits(1)) {
                this.inputFormat.open(inputSplit);
                if (array == null) {
                    runBatchSplit(sourceContext);
                } else {
                    runBatchSplitWithMetadata(sourceContext, array);
                }
                TiDBRowDataInputFormat tiDBRowDataInputFormat = this.inputFormat;
                Objects.requireNonNull(tiDBRowDataInputFormat);
                ExceptionUtils.suppressExceptions(tiDBRowDataInputFormat::close);
            }
        } finally {
            TiDBRowDataInputFormat tiDBRowDataInputFormat2 = this.inputFormat;
            Objects.requireNonNull(tiDBRowDataInputFormat2);
            ExceptionUtils.suppressExceptions(tiDBRowDataInputFormat2::closeInputFormat);
            this.runningSnapshot.set(false);
        }
    }

    private void runStreaming(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        if (this.sourceFunction == null) {
            return;
        }
        this.sourceFunction.run(sourceContext);
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        runBatch(sourceContext);
        runStreaming(sourceContext);
    }

    public void cancel() {
        if (this.sourceFunction == null) {
            return;
        }
        this.sourceFunction.cancel();
    }

    public void open(Configuration configuration) throws Exception {
        this.abstractRichFunction.open(configuration);
    }

    public void close() throws Exception {
        if (this.abstractRichFunction == null) {
            return;
        }
        this.abstractRichFunction.close();
    }
}
