package org.gradoop.flink.model.impl.operators.subgraph;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.InitGraphHead;
import org.gradoop.flink.model.impl.functions.epgm.PairElementWithNewId;
import org.gradoop.flink.model.impl.functions.tuple.Project2To1;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of4;
import org.gradoop.flink.model.impl.operators.subgraph.functions.AddGraphsToElements;
import org.gradoop.flink.model.impl.operators.subgraph.functions.AddGraphsToElementsCoGroup;
import org.gradoop.flink.model.impl.operators.subgraph.functions.EdgesWithNewGraphsTuple;
import org.gradoop.flink.model.impl.operators.subgraph.functions.ElementIdGraphIdTuple;
import org.gradoop.flink.model.impl.operators.subgraph.functions.FilterEdgeGraphs;
import org.gradoop.flink.model.impl.operators.subgraph.functions.IdSourceTargetGraphTuple;
import org.gradoop.flink.model.impl.operators.subgraph.functions.JoinTuplesWithNewGraphs;
import org.gradoop.flink.model.impl.operators.subgraph.functions.JoinWithSourceGraphIdSet;
import org.gradoop.flink.model.impl.operators.subgraph.functions.JoinWithTargetGraphIdSet;
import org.gradoop.flink.model.impl.operators.subgraph.functions.MergeEdgeGraphs;
import org.gradoop.flink.model.impl.operators.subgraph.functions.MergeTupleGraphs;
import org.gradoop.flink.model.impl.operators.subgraph.functions.SourceTargetIdGraphsTuple;

/* loaded from: input_file:org/gradoop/flink/model/impl/operators/subgraph/ApplySubgraph.class */
public class ApplySubgraph implements ApplicableUnaryGraphToGraphOperator {
    private final FilterFunction<Vertex> vertexFilterFunction;
    private final FilterFunction<Edge> edgeFilterFunction;

    public ApplySubgraph(FilterFunction<Vertex> filterFunction, FilterFunction<Edge> filterFunction2) {
        if (filterFunction == null && filterFunction2 == null) {
            throw new IllegalArgumentException("No filter functions was given.");
        }
        this.vertexFilterFunction = filterFunction;
        this.edgeFilterFunction = filterFunction2;
    }

    @Override // org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator
    public GraphCollection executeForGVELayout(GraphCollection graphCollection) {
        return (this.vertexFilterFunction == null || this.edgeFilterFunction == null) ? this.vertexFilterFunction != null ? vertexInducedSubgraph(graphCollection) : edgeInducedSubgraph(graphCollection) : subgraph(graphCollection);
    }

    @Override // org.gradoop.flink.model.api.operators.ApplicableUnaryGraphToGraphOperator
    public GraphCollection executeForTxLayout(GraphCollection graphCollection) {
        return executeForGVELayout(graphCollection);
    }

    private GraphCollection vertexInducedSubgraph(GraphCollection graphCollection) {
        MapOperator map = graphCollection.getGraphHeads().map(new Id()).map(new PairElementWithNewId());
        MapOperator map2 = map.map(new Project2To1()).map(new InitGraphHead(graphCollection.getConfig().getGraphHeadFactory()));
        GroupReduceOperator reduceGroup = graphCollection.getVertices().filter(this.vertexFilterFunction).flatMap(new ElementIdGraphIdTuple()).join(map).where(new int[]{1}).equalTo(new int[]{0}).with(new JoinTuplesWithNewGraphs()).groupBy(new int[]{0}).reduceGroup(new MergeTupleGraphs());
        return graphCollection.getConfig().getGraphCollectionFactory().fromDataSets((DataSet<GraphHead>) map2, (DataSet<Vertex>) reduceGroup.join(graphCollection.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new AddGraphsToElements()), (DataSet<Edge>) graphCollection.getEdges().flatMap(new IdSourceTargetGraphTuple()).join(map).where(new int[]{3}).equalTo(new int[]{0}).with(new EdgesWithNewGraphsTuple()).groupBy(new Value0Of4()).reduceGroup(new MergeEdgeGraphs()).join(reduceGroup).where(new int[]{1}).equalTo(new int[]{0}).with(new JoinWithSourceGraphIdSet()).join(reduceGroup).where(new int[]{2}).equalTo(new int[]{0}).with(new JoinWithTargetGraphIdSet()).flatMap(new FilterEdgeGraphs()).join(graphCollection.getEdges()).where(new int[]{0}).equalTo(new Id()).with(new AddGraphsToElements()));
    }

    private GraphCollection edgeInducedSubgraph(GraphCollection graphCollection) {
        MapOperator map = graphCollection.getGraphHeads().map(new Id()).map(new PairElementWithNewId());
        MapOperator map2 = map.map(new Project2To1()).map(new InitGraphHead(graphCollection.getConfig().getGraphHeadFactory()));
        JoinOperator.EquiJoin with = graphCollection.getEdges().filter(this.edgeFilterFunction).flatMap(new ElementIdGraphIdTuple()).join(map).where(new int[]{1}).equalTo(new int[]{0}).with(new JoinTuplesWithNewGraphs()).groupBy(new int[]{0}).reduceGroup(new MergeTupleGraphs()).join(graphCollection.getEdges()).where(new int[]{0}).equalTo(new Id()).with(new AddGraphsToElements());
        return graphCollection.getConfig().getGraphCollectionFactory().fromDataSets((DataSet<GraphHead>) map2, (DataSet<Vertex>) with.flatMap(new SourceTargetIdGraphsTuple()).distinct(new int[]{0}).coGroup(graphCollection.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new AddGraphsToElementsCoGroup()), (DataSet<Edge>) with);
    }

    private GraphCollection subgraph(GraphCollection graphCollection) {
        MapOperator map = graphCollection.getGraphHeads().map(new Id()).map(new PairElementWithNewId());
        return graphCollection.getConfig().getGraphCollectionFactory().fromDataSets((DataSet<GraphHead>) map.map(new Project2To1()).map(new InitGraphHead(graphCollection.getConfig().getGraphHeadFactory())), (DataSet<Vertex>) graphCollection.getVertices().filter(this.vertexFilterFunction).flatMap(new ElementIdGraphIdTuple()).join(map).where(new int[]{1}).equalTo(new int[]{0}).with(new JoinTuplesWithNewGraphs()).groupBy(new int[]{0}).reduceGroup(new MergeTupleGraphs()).join(graphCollection.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new AddGraphsToElements()), (DataSet<Edge>) graphCollection.getEdges().filter(this.edgeFilterFunction).flatMap(new ElementIdGraphIdTuple()).join(map).where(new int[]{1}).equalTo(new int[]{0}).with(new JoinTuplesWithNewGraphs()).groupBy(new int[]{0}).reduceGroup(new MergeTupleGraphs()).join(graphCollection.getEdges()).where(new int[]{0}).equalTo(new Id()).with(new AddGraphsToElements()));
    }

    @Override // org.gradoop.flink.model.api.operators.Operator
    public String getName() {
        return ApplySubgraph.class.getName();
    }
}
