TINKERPOP-2857 add GraphFilter support for GraphSONRecordReader
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
index da9561e..695e55d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
@@ -26,20 +26,23 @@
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Mapper;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
-import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONXModuleV2d0;
-import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONXModuleV3d0;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.TypeInfo;
 import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -50,6 +53,7 @@
     private final VertexWritable vertexWritable = new VertexWritable();
     private final LineRecordReader lineRecordReader;
     private boolean hasEdges;
+    private GraphFilter graphFilter = null;
 
     public GraphSONRecordReader() {
         this.lineRecordReader = new LineRecordReader();
@@ -59,6 +63,10 @@
     public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
         this.lineRecordReader.initialize(genericSplit, context);
         this.hasEdges = context.getConfiguration().getBoolean(Constants.GREMLIN_HADOOP_GRAPH_READER_HAS_EDGES, true);
+        if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) {
+            graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()),
+                    Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+        }
         final GraphSONVersion graphSONVersion =
                 GraphSONVersion.valueOf(context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPHSON_VERSION, "V3_0"));
         final Mapper mapper = GraphSONMapper.build().
@@ -72,15 +80,24 @@
 
     @Override
     public boolean nextKeyValue() throws IOException {
-        if (!this.lineRecordReader.nextKeyValue())
-            return false;
-
-        try (InputStream in = new ByteArrayInputStream(this.lineRecordReader.getCurrentValue().getBytes())) {
-            this.vertexWritable.set(this.hasEdges ?
-                    this.graphsonReader.readVertex(in, Attachable::get, Attachable::get, Direction.BOTH) :
-                    this.graphsonReader.readVertex(in, Attachable::get));
-            return true;
+        while(this.lineRecordReader.nextKeyValue()) {
+            try (InputStream in = new ByteArrayInputStream(this.lineRecordReader.getCurrentValue().getBytes())) {
+                Vertex vertex = this.hasEdges ?
+                        this.graphsonReader.readVertex(in, Attachable::get, Attachable::get, Direction.BOTH) :
+                        this.graphsonReader.readVertex(in, Attachable::get);
+                this.vertexWritable.set(vertex);
+                if (graphFilter == null) {
+                    return true;
+                } else {
+                    final Optional<StarGraph.StarVertex> vertexWritable = this.vertexWritable.get().applyGraphFilter(graphFilter);
+                    if (vertexWritable.isPresent()) {
+                        this.vertexWritable.set(vertexWritable.get());
+                        return true;
+                    }
+                }
+            }
         }
+        return false;
     }
 
     @Override
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
index 394aa6f..64354e9 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
+import org.apache.commons.configuration2.BaseConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,7 +34,12 @@
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -83,6 +89,16 @@
         }
     }
 
+    protected Configuration configureWithFilter(Configuration config) {
+        GraphFilter filter = new GraphFilter();
+        filter.setVertexFilter(__.hasLabel("artist"));
+        String labelKey = "filter";
+        final BaseConfiguration filterConfig = new BaseConfiguration();
+        VertexProgramHelper.serialize(filter, filterConfig, labelKey);
+        config.set(Constants.GREMLIN_HADOOP_GRAPH_FILTER, filterConfig.getProperty(labelKey).toString());
+        return config;
+    }
+
     protected Configuration configure(final File outputDirectory) {
         final Configuration configuration = new Configuration(false);
         configuration.set("fs.file.impl", LocalFileSystem.class.getName());
@@ -115,6 +131,12 @@
         final OutputFormat<NullWritable, VertexWritable> outputFormat = outFormatClass.isPresent() ? ReflectionUtils.newInstance(outFormatClass.get(), configuration) : null;
         final RecordWriter<NullWritable, VertexWritable> writer = null == outputFormat ? null : outputFormat.getRecordWriter(job);
 
+        GraphFilter graphFilter = null;
+        if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) {
+            graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration),
+                    Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+        }
+
         boolean foundKeyValue = false;
         for (final FileSplit split : fileSplits) {
             logger.info("\treading file split {}", split.getPath().getName() + " ({}", split.getStart() + "..." + (split.getStart() + split.getLength()), "{} {} bytes)");
@@ -127,6 +149,10 @@
                 assertTrue(progress >= lastProgress);
                 assertEquals(NullWritable.class, reader.getCurrentKey().getClass());
                 final VertexWritable vertexWritable = (VertexWritable) reader.getCurrentValue();
+                if (graphFilter != null) {
+                    // check whether the filter takes effect
+                    assertTrue(vertexWritable.get().applyGraphFilter(graphFilter).isPresent());
+                }
                 if (null != writer) writer.write(NullWritable.get(), vertexWritable);
                 vertexCount++;
                 outEdgeCount = outEdgeCount + (int) IteratorUtils.count(vertexWritable.get().edges(Direction.OUT));
@@ -147,13 +173,15 @@
             }
         }
 
-        if (getInputFilename().startsWith("grateful-dead-v")) {
+        if (getInputFilename().startsWith("grateful-dead-v") && graphFilter == null) {
             assertEquals(8049, outEdgeCount);
             assertEquals(8049, inEdgeCount);
             assertEquals(808, vertexCount);
             assertTrue(foundKeyValue);
         }
-        assertEquals(outEdgeCount, inEdgeCount);
+        if (graphFilter == null) {
+            assertEquals(outEdgeCount, inEdgeCount);
+        }
 
         if (null != writer) {
             writer.close(new TaskAttemptContextImpl(configuration, job.getTaskAttemptID()));
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONV3d0RecordReaderFilterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONV3d0RecordReaderFilterTest.java
new file mode 100644
index 0000000..882821b
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONV3d0RecordReaderFilterTest.java
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.RecordReaderWriterTest;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.File;
+
+public class GraphSONV3d0RecordReaderFilterTest extends RecordReaderWriterTest {
+
+    @Override
+    protected String getInputFilename() {
+        return "grateful-dead-v3d0.json";
+    }
+
+    @Override
+    protected Class<? extends InputFormat<NullWritable, VertexWritable>> getInputFormat() {
+        return GraphSONInputFormat.class;
+    }
+
+    @Override
+    protected Class<? extends OutputFormat<NullWritable, VertexWritable>> getOutputFormat() {
+        return GraphSONOutputFormat.class;
+    }
+
+    protected Configuration configure(final File outputDirectory) {
+        Configuration config = super.configure(outputDirectory);
+        return configureWithFilter(config);
+    }
+}
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderFilterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderFilterTest.java
new file mode 100644
index 0000000..7867031
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderFilterTest.java
@@ -0,0 +1,52 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.RecordReaderWriterTest;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+
+import java.io.File;
+
+public class GryoRecordReaderFilterTest extends RecordReaderWriterTest {
+    @Override
+    protected String getInputFilename() {
+        return "grateful-dead-v3d0.kryo";
+    }
+
+    @Override
+    protected Class<? extends InputFormat<NullWritable, VertexWritable>> getInputFormat() {
+        return GryoInputFormat.class;
+    }
+
+    @Override
+    protected Class<? extends OutputFormat<NullWritable, VertexWritable>> getOutputFormat() {
+        return GryoOutputFormat.class;
+    }
+
+    protected Configuration configure(final File outputDirectory) {
+        Configuration config = super.configure(outputDirectory);
+        return configureWithFilter(config);
+    }
+}