package org.gradoop.flink.io.impl.csv.indexed;

import java.io.IOException;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
import org.gradoop.flink.io.api.DataSink;
import org.gradoop.flink.io.impl.csv.CSVBase;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.functions.EdgeToCSVEdge;
import org.gradoop.flink.io.impl.csv.functions.GraphHeadToCSVGraphHead;
import org.gradoop.flink.io.impl.csv.functions.VertexToCSVVertex;
import org.gradoop.flink.io.impl.csv.indexed.functions.IndexedCSVFileFormat;
import org.gradoop.flink.io.impl.csv.metadata.MetaData;
import org.gradoop.flink.model.api.epgm.GraphCollection;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;

/* loaded from: input_file:org/gradoop/flink/io/impl/csv/indexed/IndexedCSVDataSink.class */
public class IndexedCSVDataSink extends CSVBase implements DataSink {
    private final String metaDataPath;

    public IndexedCSVDataSink(String str, GradoopFlinkConfig gradoopFlinkConfig) {
        this(str, null, gradoopFlinkConfig);
    }

    public IndexedCSVDataSink(String str, String str2, GradoopFlinkConfig gradoopFlinkConfig) {
        super(str, gradoopFlinkConfig);
        this.metaDataPath = str2;
    }

    @Override // org.gradoop.flink.io.api.DataSink
    public void write(LogicalGraph logicalGraph) throws IOException {
        write(logicalGraph, false);
    }

    @Override // org.gradoop.flink.io.api.DataSink
    public void write(GraphCollection graphCollection) throws IOException {
        write(graphCollection, false);
    }

    @Override // org.gradoop.flink.io.api.DataSink
    public void write(LogicalGraph logicalGraph, boolean z) throws IOException {
        write(logicalGraph.getConfig().getGraphCollectionFactory().fromGraph(logicalGraph), z);
    }

    @Override // org.gradoop.flink.io.api.DataSink
    public void write(GraphCollection graphCollection, boolean z) throws IOException {
        FileSystem.WriteMode writeMode = z ? FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;
        DataSet<Tuple3<String, String, String>> createMetaData = !reuseMetadata() ? createMetaData(graphCollection) : MetaData.fromFile(this.metaDataPath, getConfig());
        SingleInputUdfOperator withBroadcastSet = graphCollection.getGraphHeads().map(new GraphHeadToCSVGraphHead()).withBroadcastSet(createMetaData, "metadata");
        SingleInputUdfOperator withBroadcastSet2 = graphCollection.getVertices().map(new VertexToCSVVertex()).withBroadcastSet(createMetaData, "metadata");
        SingleInputUdfOperator withBroadcastSet3 = graphCollection.getEdges().map(new EdgeToCSVEdge()).withBroadcastSet(createMetaData, "metadata");
        if (!getMetaDataPath().equals(this.metaDataPath) || !reuseMetadata()) {
            createMetaData.writeAsCsv(getMetaDataPath(), CSVConstants.ROW_DELIMITER, ";", writeMode).setParallelism(1);
        }
        withBroadcastSet.output(internalWriteAsIndexedCsv(withBroadcastSet, new Path(getGraphHeadPath()), CSVConstants.ROW_DELIMITER, ";", writeMode));
        withBroadcastSet2.output(internalWriteAsIndexedCsv(withBroadcastSet2, new Path(getVertexPath()), CSVConstants.ROW_DELIMITER, ";", writeMode));
        withBroadcastSet3.output(internalWriteAsIndexedCsv(withBroadcastSet3, new Path(getEdgePath()), CSVConstants.ROW_DELIMITER, ";", writeMode));
    }

    private boolean reuseMetadata() {
        return (this.metaDataPath == null || this.metaDataPath.isEmpty()) ? false : true;
    }

    private <X extends Tuple> IndexedCSVFileFormat<X> internalWriteAsIndexedCsv(DataSet dataSet, Path path, String str, String str2, FileSystem.WriteMode writeMode) {
        Preconditions.checkArgument(dataSet.getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
        IndexedCSVFileFormat<X> indexedCSVFileFormat = new IndexedCSVFileFormat<>(path, str, str2);
        indexedCSVFileFormat.setWriteMode(writeMode);
        return indexedCSVFileFormat;
    }
}
