package org.gradoop.storage.impl.hbase.io;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang.NotImplementedException;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.gradoop.common.GradoopTestUtils;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.api.epgm.GraphCollection;
import org.gradoop.flink.util.FlinkAsciiGraphLoader;
import org.gradoop.flink.util.GradoopFlinkConfig;
import org.gradoop.storage.common.predicate.query.Query;
import org.gradoop.storage.impl.hbase.GradoopHBaseTestBase;
import org.gradoop.storage.impl.hbase.HBaseEPGMStore;
import org.gradoop.storage.utils.HBaseFilters;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/gradoop/storage/impl/hbase/io/HBaseDataSinkSourceTest.class */
public class HBaseDataSinkSourceTest extends GradoopFlinkTestBase {
    private final GradoopFlinkConfig config = GradoopFlinkConfig.createConfig(getExecutionEnvironment());
    private HBaseEPGMStore epgmStore;

    @Before
    public void setUp() throws IOException {
        this.epgmStore = GradoopHBaseTestBase.openEPGMStore("HBaseDataSinkSourceTest.");
        GradoopHBaseTestBase.writeSocialGraphToStore(this.epgmStore);
    }

    @After
    public void tearDown() throws IOException {
        if (this.epgmStore != null) {
            this.epgmStore.close();
        }
    }

    @Test
    public void testReadFromSource() throws Exception {
        GraphCollection graphCollection = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).getGraphCollection();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        ArrayList newArrayList3 = Lists.newArrayList();
        graphCollection.getGraphHeads().output(new LocalCollectionOutputFormat(newArrayList));
        graphCollection.getVertices().output(new LocalCollectionOutputFormat(newArrayList2));
        graphCollection.getEdges().output(new LocalCollectionOutputFormat(newArrayList3));
        getExecutionEnvironment().execute();
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialGraphHeads(), newArrayList);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialVertices(), newArrayList2);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialVertices(), newArrayList2);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialEdges(), newArrayList3);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialEdges(), newArrayList3);
    }

    @Test
    public void testReadFromSourceWithEmptyPredicates() throws Exception {
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().noFilter()).applyEdgePredicate(Query.elements().fromAll().noFilter()).applyVertexPredicate(Query.elements().fromAll().noFilter());
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        ArrayList newArrayList3 = Lists.newArrayList();
        graphCollection.getGraphHeads().output(new LocalCollectionOutputFormat(newArrayList));
        graphCollection.getVertices().output(new LocalCollectionOutputFormat(newArrayList2));
        graphCollection.getEdges().output(new LocalCollectionOutputFormat(newArrayList3));
        getExecutionEnvironment().execute();
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialGraphHeads(), newArrayList);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialVertices(), newArrayList2);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialVertices(), newArrayList2);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialEdges(), newArrayList3);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialEdges(), newArrayList3);
    }

    @Test
    public void testReadWithGraphIdPredicate() throws Throwable {
        List subList = new ArrayList(GradoopHBaseTestBase.getSocialGraphHeads()).subList(1, 3);
        GradoopIdSet fromExisting = GradoopIdSet.fromExisting((Collection) subList.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        HBaseDataSource applyGraphPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromSets(fromExisting).noFilter());
        Assert.assertTrue(applyGraphPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyGraphPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(subList, collect);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialVertices(), collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialVertices(), collect2);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialEdges(), collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialEdges(), collect3);
    }

    @Test
    public void testReadWithVertexIdPredicate() throws Throwable {
        List subList = new ArrayList(GradoopHBaseTestBase.getSocialVertices()).subList(0, 3);
        GradoopIdSet fromExisting = GradoopIdSet.fromExisting((Collection) subList.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyVertexPredicate(Query.elements().fromSets(fromExisting).noFilter());
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialGraphHeads(), collect);
        GradoopTestUtils.validateEPGMElementCollections(subList, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(subList, collect2);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialEdges(), collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialEdges(), collect3);
    }

    @Test
    public void testReadWithEdgeIdPredicate() throws Throwable {
        List subList = new ArrayList(GradoopHBaseTestBase.getSocialEdges()).subList(0, 3);
        GradoopIdSet fromExisting = GradoopIdSet.fromExisting((Collection) subList.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        HBaseDataSource applyEdgePredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyEdgePredicate(Query.elements().fromSets(fromExisting).noFilter());
        Assert.assertTrue(applyEdgePredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyEdgePredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialGraphHeads(), collect);
        GradoopTestUtils.validateEPGMElementCollections(GradoopHBaseTestBase.getSocialVertices(), collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(GradoopHBaseTestBase.getSocialVertices(), collect2);
        GradoopTestUtils.validateEPGMElementCollections(subList, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(subList, collect3);
    }

    @Test
    public void testReadWithLabelInPredicate() throws Exception {
        List list = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialGraphHeads()).stream().filter(graphHead -> {
            return graphHead.getLabel().equals(GradoopHBaseTestBase.LABEL_FORUM);
        }).collect(Collectors.toList());
        List list2 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialEdges()).stream().filter(edge -> {
            return edge.getLabel().equals(GradoopHBaseTestBase.LABEL_HAS_MODERATOR) || edge.getLabel().equals(GradoopHBaseTestBase.LABEL_HAS_MEMBER);
        }).collect(Collectors.toList());
        List list3 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialVertices()).stream().filter(vertex -> {
            return vertex.getLabel().equals(GradoopHBaseTestBase.LABEL_TAG) || vertex.getLabel().equals(GradoopHBaseTestBase.LABEL_FORUM);
        }).collect(Collectors.toList());
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().where(HBaseFilters.labelIn(new String[]{GradoopHBaseTestBase.LABEL_FORUM}))).applyEdgePredicate(Query.elements().fromAll().where(HBaseFilters.labelIn(new String[]{GradoopHBaseTestBase.LABEL_HAS_MODERATOR, GradoopHBaseTestBase.LABEL_HAS_MEMBER}))).applyVertexPredicate(Query.elements().fromAll().where(HBaseFilters.labelIn(new String[]{GradoopHBaseTestBase.LABEL_TAG, GradoopHBaseTestBase.LABEL_FORUM})));
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(list, collect);
        GradoopTestUtils.validateEPGMElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMElementCollections(list2, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(list2, collect3);
    }

    @Test
    public void testReadWithLabelRegPredicate() throws Exception {
        List list = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialGraphHeads()).stream().filter(graphHead -> {
            return GradoopHBaseTestBase.PATTERN_GRAPH.matcher(graphHead.getLabel()).matches();
        }).collect(Collectors.toList());
        List list2 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialEdges()).stream().filter(edge -> {
            return GradoopHBaseTestBase.PATTERN_EDGE.matcher(edge.getLabel()).matches();
        }).collect(Collectors.toList());
        List list3 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialVertices()).stream().filter(vertex -> {
            return GradoopHBaseTestBase.PATTERN_VERTEX.matcher(vertex.getLabel()).matches();
        }).collect(Collectors.toList());
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().where(HBaseFilters.labelReg(GradoopHBaseTestBase.PATTERN_GRAPH))).applyEdgePredicate(Query.elements().fromAll().where(HBaseFilters.labelReg(GradoopHBaseTestBase.PATTERN_EDGE))).applyVertexPredicate(Query.elements().fromAll().where(HBaseFilters.labelReg(GradoopHBaseTestBase.PATTERN_VERTEX)));
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(list, collect);
        GradoopTestUtils.validateEPGMElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMElementCollections(list2, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(list2, collect3);
    }

    @Test
    public void testReadWithPropEqualsPredicate() throws Exception {
        PropertyValue create = PropertyValue.create(3);
        PropertyValue create2 = PropertyValue.create(2013);
        PropertyValue create3 = PropertyValue.create("Leipzig");
        List list = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialGraphHeads()).stream().filter(graphHead -> {
            return graphHead.hasProperty(GradoopHBaseTestBase.PROP_VERTEX_COUNT);
        }).filter(graphHead2 -> {
            return graphHead2.getPropertyValue(GradoopHBaseTestBase.PROP_VERTEX_COUNT).equals(create);
        }).collect(Collectors.toList());
        List list2 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialEdges()).stream().filter(edge -> {
            return edge.hasProperty(GradoopHBaseTestBase.PROP_SINCE);
        }).filter(edge2 -> {
            return edge2.getPropertyValue(GradoopHBaseTestBase.PROP_SINCE).equals(create2);
        }).collect(Collectors.toList());
        List list3 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialVertices()).stream().filter(vertex -> {
            return vertex.hasProperty(GradoopHBaseTestBase.PROP_CITY);
        }).filter(vertex2 -> {
            return vertex2.getPropertyValue(GradoopHBaseTestBase.PROP_CITY).equals(create3);
        }).collect(Collectors.toList());
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().where(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_VERTEX_COUNT, create))).applyEdgePredicate(Query.elements().fromAll().where(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_SINCE, create2))).applyVertexPredicate(Query.elements().fromAll().where(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_CITY, create3)));
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(list, collect);
        GradoopTestUtils.validateEPGMElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMElementCollections(list2, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(list2, collect3);
    }

    @Test
    public void testReadWithPropLargerThanPredicate() throws Exception {
        PropertyValue create = PropertyValue.create(3);
        PropertyValue create2 = PropertyValue.create(2014);
        PropertyValue create3 = PropertyValue.create(30);
        List list = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialGraphHeads()).stream().filter(graphHead -> {
            return graphHead.hasProperty(GradoopHBaseTestBase.PROP_VERTEX_COUNT);
        }).filter(graphHead2 -> {
            return graphHead2.getPropertyValue(GradoopHBaseTestBase.PROP_VERTEX_COUNT).compareTo(create) >= 0;
        }).collect(Collectors.toList());
        List list2 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialEdges()).stream().filter(edge -> {
            return edge.hasProperty(GradoopHBaseTestBase.PROP_SINCE);
        }).filter(edge2 -> {
            return edge2.getPropertyValue(GradoopHBaseTestBase.PROP_SINCE).compareTo(create2) > 0;
        }).collect(Collectors.toList());
        List list3 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialVertices()).stream().filter(vertex -> {
            return vertex.hasProperty(GradoopHBaseTestBase.PROP_AGE);
        }).filter(vertex2 -> {
            return vertex2.getPropertyValue(GradoopHBaseTestBase.PROP_AGE).compareTo(create3) > 0;
        }).collect(Collectors.toList());
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().where(HBaseFilters.propLargerThan(GradoopHBaseTestBase.PROP_VERTEX_COUNT, create, true))).applyEdgePredicate(Query.elements().fromAll().where(HBaseFilters.propLargerThan(GradoopHBaseTestBase.PROP_SINCE, create2, false))).applyVertexPredicate(Query.elements().fromAll().where(HBaseFilters.propLargerThan(GradoopHBaseTestBase.PROP_AGE, create3, false)));
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        GradoopTestUtils.validateEPGMElementCollections(list, collect);
        GradoopTestUtils.validateEPGMElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMElementCollections(list2, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(list2, collect3);
    }

    @Test
    public void testReadWithPropRegPredicate() throws Exception {
        List list = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialGraphHeads()).stream().filter(graphHead -> {
            return graphHead.hasProperty(GradoopHBaseTestBase.PROP_INTEREST);
        }).filter(graphHead2 -> {
            return graphHead2.getPropertyValue(GradoopHBaseTestBase.PROP_INTEREST).getString().matches(GradoopHBaseTestBase.PATTERN_GRAPH_PROP.pattern());
        }).collect(Collectors.toList());
        List list2 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialEdges()).stream().filter(edge -> {
            return edge.hasProperty(GradoopHBaseTestBase.PROP_STATUS);
        }).filter(edge2 -> {
            return edge2.getPropertyValue(GradoopHBaseTestBase.PROP_STATUS).getString().matches(GradoopHBaseTestBase.PATTERN_EDGE_PROP.pattern());
        }).collect(Collectors.toList());
        List list3 = (List) Lists.newArrayList(GradoopHBaseTestBase.getSocialVertices()).stream().filter(vertex -> {
            return vertex.hasProperty(GradoopHBaseTestBase.PROP_NAME);
        }).filter(vertex2 -> {
            return vertex2.getPropertyValue(GradoopHBaseTestBase.PROP_NAME).getString().matches(GradoopHBaseTestBase.PATTERN_VERTEX_PROP.pattern());
        }).collect(Collectors.toList());
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().where(HBaseFilters.propReg(GradoopHBaseTestBase.PROP_INTEREST, GradoopHBaseTestBase.PATTERN_GRAPH_PROP))).applyEdgePredicate(Query.elements().fromAll().where(HBaseFilters.propReg(GradoopHBaseTestBase.PROP_STATUS, GradoopHBaseTestBase.PATTERN_EDGE_PROP))).applyVertexPredicate(Query.elements().fromAll().where(HBaseFilters.propReg(GradoopHBaseTestBase.PROP_NAME, GradoopHBaseTestBase.PATTERN_VERTEX_PROP)));
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        Assert.assertEquals(1L, collect.size());
        Assert.assertEquals(2L, collect3.size());
        Assert.assertEquals(2L, collect2.size());
        GradoopTestUtils.validateEPGMElementCollections(list, collect);
        GradoopTestUtils.validateEPGMElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(list3, collect2);
        GradoopTestUtils.validateEPGMElementCollections(list2, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(list2, collect3);
    }

    @Test
    public void testReadWithChainedPredicates() throws Exception {
        List list = (List) GradoopHBaseTestBase.getSocialGraphHeads().stream().filter(graphHead -> {
            return graphHead.getLabel().equals("Community");
        }).filter(graphHead2 -> {
            return (graphHead2.getPropertyValue(GradoopHBaseTestBase.PROP_INTEREST).getString().equals("Hadoop") || graphHead2.getPropertyValue(GradoopHBaseTestBase.PROP_INTEREST).getString().equals("Graphs")) ? false : true;
        }).collect(Collectors.toList());
        List list2 = (List) GradoopHBaseTestBase.getSocialEdges().stream().filter(edge -> {
            return edge.getLabel().matches(GradoopHBaseTestBase.PATTERN_EDGE.pattern()) || (edge.hasProperty(GradoopHBaseTestBase.PROP_SINCE) && edge.getPropertyValue(GradoopHBaseTestBase.PROP_SINCE).getInt() < 2015);
        }).collect(Collectors.toList());
        List subList = ((List) GradoopHBaseTestBase.getSocialVertices().stream().filter(vertex -> {
            return vertex.getLabel().equals("Person");
        }).collect(Collectors.toList())).subList(1, 4);
        HBaseDataSource applyVertexPredicate = new HBaseDataSource(this.epgmStore, GradoopFlinkConfig.createConfig(getExecutionEnvironment())).applyGraphPredicate(Query.elements().fromAll().where(HBaseFilters.labelIn(new String[]{"Community"}).and(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_INTEREST, "Hadoop").or(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_INTEREST, "Graphs")).negate()))).applyEdgePredicate(Query.elements().fromAll().where(HBaseFilters.labelReg(GradoopHBaseTestBase.PATTERN_EDGE).or(HBaseFilters.propLargerThan(GradoopHBaseTestBase.PROP_SINCE, 2015, true).negate()))).applyVertexPredicate(Query.elements().fromAll().where(HBaseFilters.labelIn(new String[]{"Person"}).and(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_NAME, ((Vertex) subList.get(0)).getPropertyValue(GradoopHBaseTestBase.PROP_NAME)).or(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_NAME, ((Vertex) subList.get(1)).getPropertyValue(GradoopHBaseTestBase.PROP_NAME)).or(HBaseFilters.propEquals(GradoopHBaseTestBase.PROP_NAME, ((Vertex) subList.get(2)).getPropertyValue(GradoopHBaseTestBase.PROP_NAME)))))));
        Assert.assertTrue(applyVertexPredicate.isFilterPushedDown());
        GraphCollection graphCollection = applyVertexPredicate.getGraphCollection();
        List collect = graphCollection.getGraphHeads().collect();
        List collect2 = graphCollection.getVertices().collect();
        List collect3 = graphCollection.getEdges().collect();
        Assert.assertEquals(1L, collect.size());
        Assert.assertEquals(21L, collect3.size());
        Assert.assertEquals(3L, collect2.size());
        GradoopTestUtils.validateEPGMElementCollections(list, collect);
        GradoopTestUtils.validateEPGMElementCollections(subList, collect2);
        GradoopTestUtils.validateEPGMGraphElementCollections(subList, collect2);
        GradoopTestUtils.validateEPGMElementCollections(list2, collect3);
        GradoopTestUtils.validateEPGMGraphElementCollections(list2, collect3);
    }

    @Test
    public void testWriteToSink() throws Exception {
        HBaseEPGMStore createEmptyEPGMStore = GradoopHBaseTestBase.createEmptyEPGMStore("testWriteToSink");
        FlinkAsciiGraphLoader flinkAsciiGraphLoader = new FlinkAsciiGraphLoader(this.config);
        flinkAsciiGraphLoader.initDatabaseFromStream(getClass().getResourceAsStream("/data/gdl/social_network.gdl"));
        GradoopFlinkConfig createConfig = GradoopFlinkConfig.createConfig(getExecutionEnvironment());
        new HBaseDataSink(createEmptyEPGMStore, createConfig).write(createConfig.getGraphCollectionFactory().fromCollections(flinkAsciiGraphLoader.getGraphHeads(), flinkAsciiGraphLoader.getVertices(), flinkAsciiGraphLoader.getEdges()));
        getExecutionEnvironment().execute();
        createEmptyEPGMStore.flush();
        GradoopTestUtils.validateEPGMElementCollections(flinkAsciiGraphLoader.getGraphHeads(), createEmptyEPGMStore.getGraphSpace().readRemainsAndClose());
        GradoopTestUtils.validateEPGMElementCollections(flinkAsciiGraphLoader.getVertices(), createEmptyEPGMStore.getVertexSpace().readRemainsAndClose());
        GradoopTestUtils.validateEPGMGraphElementCollections(flinkAsciiGraphLoader.getVertices(), createEmptyEPGMStore.getVertexSpace().readRemainsAndClose());
        GradoopTestUtils.validateEPGMElementCollections(flinkAsciiGraphLoader.getEdges(), createEmptyEPGMStore.getEdgeSpace().readRemainsAndClose());
        GradoopTestUtils.validateEPGMGraphElementCollections(flinkAsciiGraphLoader.getEdges(), createEmptyEPGMStore.getEdgeSpace().readRemainsAndClose());
    }

    @Test(expected = NotImplementedException.class)
    public void testWriteToSinkWithOverWrite() throws Exception {
        HBaseEPGMStore createEmptyEPGMStore = GradoopHBaseTestBase.createEmptyEPGMStore("testWriteToSink");
        GradoopFlinkConfig createConfig = GradoopFlinkConfig.createConfig(getExecutionEnvironment());
        new HBaseDataSink(createEmptyEPGMStore, createConfig).write(createConfig.getGraphCollectionFactory().createEmptyCollection(), true);
        getExecutionEnvironment().execute();
    }
}
