TINKERPOP-2857 add GraphFilter support for GraphSONRecordReader
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
index da9561e..695e55d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java
@@ -26,20 +26,23 @@
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.Mapper;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader;
-import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONXModuleV2d0;
-import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONXModuleV3d0;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
import org.apache.tinkerpop.gremlin.structure.io.graphson.TypeInfo;
import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Optional;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -50,6 +53,7 @@
private final VertexWritable vertexWritable = new VertexWritable();
private final LineRecordReader lineRecordReader;
private boolean hasEdges;
+ private GraphFilter graphFilter = null;
public GraphSONRecordReader() {
this.lineRecordReader = new LineRecordReader();
@@ -59,6 +63,10 @@
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
this.lineRecordReader.initialize(genericSplit, context);
this.hasEdges = context.getConfiguration().getBoolean(Constants.GREMLIN_HADOOP_GRAPH_READER_HAS_EDGES, true);
+ if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) {
+ graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()),
+ Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ }
final GraphSONVersion graphSONVersion =
GraphSONVersion.valueOf(context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPHSON_VERSION, "V3_0"));
final Mapper mapper = GraphSONMapper.build().
@@ -72,15 +80,24 @@
@Override
public boolean nextKeyValue() throws IOException {
- if (!this.lineRecordReader.nextKeyValue())
- return false;
-
- try (InputStream in = new ByteArrayInputStream(this.lineRecordReader.getCurrentValue().getBytes())) {
- this.vertexWritable.set(this.hasEdges ?
- this.graphsonReader.readVertex(in, Attachable::get, Attachable::get, Direction.BOTH) :
- this.graphsonReader.readVertex(in, Attachable::get));
- return true;
+ while(this.lineRecordReader.nextKeyValue()) {
+ try (InputStream in = new ByteArrayInputStream(this.lineRecordReader.getCurrentValue().getBytes())) {
+ Vertex vertex = this.hasEdges ?
+ this.graphsonReader.readVertex(in, Attachable::get, Attachable::get, Direction.BOTH) :
+ this.graphsonReader.readVertex(in, Attachable::get);
+ this.vertexWritable.set(vertex);
+ if (graphFilter == null) {
+ return true;
+ } else {
+ final Optional<StarGraph.StarVertex> vertexWritable = this.vertexWritable.get().applyGraphFilter(graphFilter);
+ if (vertexWritable.isPresent()) {
+ this.vertexWritable.set(vertexWritable.get());
+ return true;
+ }
+ }
+ }
}
+ return false;
}
@Override
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
index 394aa6f..64354e9 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+import org.apache.commons.configuration2.BaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,7 +34,12 @@
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -83,6 +89,16 @@
}
}
+ protected Configuration configureWithFilter(Configuration config) {
+ GraphFilter filter = new GraphFilter();
+ filter.setVertexFilter(__.hasLabel("artist"));
+ String labelKey = "filter";
+ final BaseConfiguration filterConfig = new BaseConfiguration();
+ VertexProgramHelper.serialize(filter, filterConfig, labelKey);
+ config.set(Constants.GREMLIN_HADOOP_GRAPH_FILTER, filterConfig.getProperty(labelKey).toString());
+ return config;
+ }
+
protected Configuration configure(final File outputDirectory) {
final Configuration configuration = new Configuration(false);
configuration.set("fs.file.impl", LocalFileSystem.class.getName());
@@ -115,6 +131,12 @@
final OutputFormat<NullWritable, VertexWritable> outputFormat = outFormatClass.isPresent() ? ReflectionUtils.newInstance(outFormatClass.get(), configuration) : null;
final RecordWriter<NullWritable, VertexWritable> writer = null == outputFormat ? null : outputFormat.getRecordWriter(job);
+ GraphFilter graphFilter = null;
+ if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) {
+ graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration),
+ Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ }
+
boolean foundKeyValue = false;
for (final FileSplit split : fileSplits) {
logger.info("\treading file split {}", split.getPath().getName() + " ({}", split.getStart() + "..." + (split.getStart() + split.getLength()), "{} {} bytes)");
@@ -127,6 +149,10 @@
assertTrue(progress >= lastProgress);
assertEquals(NullWritable.class, reader.getCurrentKey().getClass());
final VertexWritable vertexWritable = (VertexWritable) reader.getCurrentValue();
+ if (graphFilter != null) {
+ // check whether the filter takes effect
+ assertTrue(vertexWritable.get().applyGraphFilter(graphFilter).isPresent());
+ }
if (null != writer) writer.write(NullWritable.get(), vertexWritable);
vertexCount++;
outEdgeCount = outEdgeCount + (int) IteratorUtils.count(vertexWritable.get().edges(Direction.OUT));
@@ -147,13 +173,15 @@
}
}
- if (getInputFilename().startsWith("grateful-dead-v")) {
+ if (getInputFilename().startsWith("grateful-dead-v") && graphFilter == null) {
assertEquals(8049, outEdgeCount);
assertEquals(8049, inEdgeCount);
assertEquals(808, vertexCount);
assertTrue(foundKeyValue);
}
- assertEquals(outEdgeCount, inEdgeCount);
+ if (graphFilter == null) {
+ assertEquals(outEdgeCount, inEdgeCount);
+ }
if (null != writer) {
writer.close(new TaskAttemptContextImpl(configuration, job.getTaskAttemptID()));
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONV3d0RecordReaderFilterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONV3d0RecordReaderFilterTest.java
new file mode 100644
index 0000000..882821b
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONV3d0RecordReaderFilterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.RecordReaderWriterTest;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.File;
+
+public class GraphSONV3d0RecordReaderFilterTest extends RecordReaderWriterTest {
+
+ @Override
+ protected String getInputFilename() {
+ return "grateful-dead-v3d0.json";
+ }
+
+ @Override
+ protected Class<? extends InputFormat<NullWritable, VertexWritable>> getInputFormat() {
+ return GraphSONInputFormat.class;
+ }
+
+ @Override
+ protected Class<? extends OutputFormat<NullWritable, VertexWritable>> getOutputFormat() {
+ return GraphSONOutputFormat.class;
+ }
+
+ protected Configuration configure(final File outputDirectory) {
+ Configuration config = super.configure(outputDirectory);
+ return configureWithFilter(config);
+ }
+}
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderFilterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderFilterTest.java
new file mode 100644
index 0000000..7867031
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderFilterTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.RecordReaderWriterTest;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+
+import java.io.File;
+
+public class GryoRecordReaderFilterTest extends RecordReaderWriterTest {
+ @Override
+ protected String getInputFilename() {
+ return "grateful-dead-v3d0.kryo";
+ }
+
+ @Override
+ protected Class<? extends InputFormat<NullWritable, VertexWritable>> getInputFormat() {
+ return GryoInputFormat.class;
+ }
+
+ @Override
+ protected Class<? extends OutputFormat<NullWritable, VertexWritable>> getOutputFormat() {
+ return GryoOutputFormat.class;
+ }
+
+ protected Configuration configure(final File outputDirectory) {
+ Configuration config = super.configure(outputDirectory);
+ return configureWithFilter(config);
+ }
+}