package org.gradoop.flink.io.impl.hbase;

import java.io.IOException;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.storage.impl.hbase.HBaseEPGMStore;
import org.gradoop.flink.io.api.DataSink;
import org.gradoop.flink.io.impl.hbase.functions.BuildEdgeMutation;
import org.gradoop.flink.io.impl.hbase.functions.BuildGraphHeadMutation;
import org.gradoop.flink.io.impl.hbase.functions.BuildGraphTransactions;
import org.gradoop.flink.io.impl.hbase.functions.BuildPersistentEdge;
import org.gradoop.flink.io.impl.hbase.functions.BuildPersistentGraphHead;
import org.gradoop.flink.io.impl.hbase.functions.BuildPersistentVertex;
import org.gradoop.flink.io.impl.hbase.functions.BuildVertexDataWithEdges;
import org.gradoop.flink.io.impl.hbase.functions.BuildVertexMutation;
import org.gradoop.flink.io.impl.hbase.functions.EdgeSetBySourceId;
import org.gradoop.flink.io.impl.hbase.functions.EdgeSetByTargetId;
import org.gradoop.flink.model.api.epgm.GraphCollection;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;
import org.gradoop.flink.model.impl.functions.graphcontainment.PairGraphIdWithElementId;
import org.gradoop.flink.util.GradoopFlinkConfig;

/* loaded from: input_file:org/gradoop/flink/io/impl/hbase/HBaseDataSink.class */
public class HBaseDataSink extends HBaseBase<GraphHead, Vertex, Edge> implements DataSink {
    public HBaseDataSink(HBaseEPGMStore<GraphHead, Vertex, Edge> hBaseEPGMStore, GradoopFlinkConfig gradoopFlinkConfig) {
        super(hBaseEPGMStore, gradoopFlinkConfig);
    }

    public void write(LogicalGraph logicalGraph) throws IOException {
        write(logicalGraph, false);
    }

    public void write(GraphCollection graphCollection) throws IOException {
        write(graphCollection, false);
    }

    public void write(LogicalGraph logicalGraph, boolean z) throws IOException {
        write(getFlinkConfig().getGraphCollectionFactory().fromGraph(logicalGraph), z);
    }

    public void write(GraphCollection graphCollection, boolean z) throws IOException {
        writeGraphHeads(graphCollection);
        writeVertices(graphCollection);
        writeEdges(graphCollection);
    }

    private void writeGraphHeads(GraphCollection graphCollection) throws IOException {
        JoinOperator.EquiJoin with = graphCollection.getVertices().flatMap(new PairGraphIdWithElementId()).coGroup(graphCollection.getEdges().flatMap(new PairGraphIdWithElementId())).where(new int[]{0}).equalTo(new int[]{0}).with(new BuildGraphTransactions()).join(graphCollection.getGraphHeads()).where(new int[]{0}).equalTo(new Id()).with(new BuildPersistentGraphHead(getHBaseConfig().getPersistentGraphHeadFactory()));
        Job job = Job.getInstance();
        job.getConfiguration().set("hbase.mapred.outputtable", getHBaseConfig().getGraphTableName());
        with.map(new BuildGraphHeadMutation(getHBaseConfig().getGraphHeadHandler())).output(new HadoopOutputFormat(new TableOutputFormat(), job));
    }

    private void writeVertices(GraphCollection graphCollection) throws IOException {
        CoGroupOperator with = graphCollection.getVertices().coGroup(graphCollection.getEdges().groupBy(new SourceId()).reduceGroup(new EdgeSetBySourceId())).where(new Id()).equalTo(new int[]{0}).with(new BuildVertexDataWithEdges()).coGroup(graphCollection.getEdges().groupBy(new TargetId()).reduceGroup(new EdgeSetByTargetId())).where(new String[]{"f0.id"}).equalTo(new int[]{0}).with(new BuildPersistentVertex(getHBaseConfig().getPersistentVertexFactory()));
        Job job = Job.getInstance();
        job.getConfiguration().set("hbase.mapred.outputtable", getHBaseConfig().getVertexTableName());
        with.map(new BuildVertexMutation(getHBaseConfig().getVertexHandler())).output(new HadoopOutputFormat(new TableOutputFormat(), job));
    }

    private void writeEdges(GraphCollection graphCollection) throws IOException {
        JoinOperator.EquiJoin with = graphCollection.getVertices().join(graphCollection.getEdges()).where(new Id()).equalTo(new SourceId()).join(graphCollection.getVertices()).where(new String[]{"f1.targetId"}).equalTo(new Id()).with(new BuildPersistentEdge(getHBaseConfig().getPersistentEdgeFactory()));
        Job job = Job.getInstance();
        job.getConfiguration().set("hbase.mapred.outputtable", getHBaseConfig().getEdgeTableName());
        with.map(new BuildEdgeMutation(getHBaseConfig().getEdgeHandler())).output(new HadoopOutputFormat(new TableOutputFormat(), job));
    }
}
