Merge branch '3.6-dev'
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d560b0c..d75f7ff 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -249,6 +249,7 @@
* Fixed issue where the `GremlinGroovyScriptEngine` reused the same translator concurrently which lead to incorrect translations.
* Fixed bug where tasks that haven't started running yet time out due to `evaluationTimeout` and never send a response back to the client.
* Set the exact exception in `initializationFailure` on the Java driver instead of the root cause.
+* Added `SparkIOUtil` utility to load graph into Spark RDD.
[[release-3-5-4]]
=== TinkerPop 3.5.4 (Release Date: July 18, 2022)
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 5b40b15..9023f14 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -41,6 +41,11 @@
=== Upgrading for Users
+==== SparkIOUtil utility
+
+A utility class `SparkIOUtil` is introduces, which allows users to load graph into Spark RDD via
+`loadVertices(org.apache.commons.configuration2.Configuration, JavaSparkContext)` method.
+
==== Gremlin-Go
After introducing a number of release candidates over the past several months, the official release of Gremlin-Go is now
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index d364b3d..9b7a012 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -68,6 +68,7 @@
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkIOUtil;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Direction;
@@ -282,28 +283,26 @@
}
}
}
- final InputRDD inputRDD;
- final OutputRDD outputRDD;
+ final InputRDD inputRDD = SparkIOUtil.createInputRDD(hadoopConfiguration);
final boolean filtered;
+ // if the input class can filter on load, then set the filters
+ if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
+ GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
+ filtered = false;
+ } else if (inputRDD instanceof GraphFilterAware) {
+ ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
+ filtered = false;
+ } else if (this.graphFilter.hasFilter()) {
+ filtered = true;
+ } else {
+ filtered = false;
+ }
+
+ final OutputRDD outputRDD;
try {
- inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
- hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
- InputFormatRDD.class.newInstance();
outputRDD = OutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)) ?
hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, OutputRDD.class, OutputRDD.class).newInstance() :
OutputFormatRDD.class.newInstance();
- // if the input class can filter on load, then set the filters
- if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
- GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
- filtered = false;
- } else if (inputRDD instanceof GraphFilterAware) {
- ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
- filtered = false;
- } else if (this.graphFilter.hasFilter()) {
- filtered = true;
- } else {
- filtered = false;
- }
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -331,7 +330,7 @@
updateLocalConfiguration(sparkContext, hadoopConfiguration);
// create a message-passing friendly rdd from the input rdd
boolean partitioned = false;
- JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
+ JavaPairRDD<Object, VertexWritable> loadedGraphRDD = SparkIOUtil.loadVertices(inputRDD, graphComputerConfiguration, sparkContext);
// if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
if (filtered) {
this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java
new file mode 100644
index 0000000..1415b75
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to load graph as Spark RDDs
+ *
+ * @author Boxuan Li (https://li-boxuan.com)
+ */
+public class SparkIOUtil {
+ private static final Logger logger = LoggerFactory.getLogger(SparkIOUtil.class);
+
+ /**
+ * Load graph into Spark RDDs. Note: Graph configurations must include {@value Constants#GREMLIN_HADOOP_GRAPH_READER}.
+ * <p>
+ * See Example below:
+ * <pre>
+ * SparkConf sparkConf = new SparkConf().setAppName("Spark Graph")
+ * .set(SparkLauncher.SPARK_MASTER, "local[4]")
+ * .set(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ * JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
+ *
+ * Configuration sparkGraphConfiguration = new BaseConfiguration();
+ * sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ * sparkGraphConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+ * SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern-v3d0.kryo"));
+ *
+ * // load vertices
+ * JavaPairRDD verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+ * </pre>
+ *
+ * @param sparkGraphConfiguration graph configurations
+ * @param sparkContext a JavaSparkContext instance
+ * @return vertices in Spark RDD
+ */
+ public static JavaPairRDD<Object, VertexWritable> loadVertices(
+ final org.apache.commons.configuration2.Configuration sparkGraphConfiguration,
+ final JavaSparkContext sparkContext) {
+ assert sparkGraphConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_READER);
+ logger.debug("Loading vertices into Spark RDD...");
+ final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(sparkGraphConfiguration);
+ return loadVertices(createInputRDD(hadoopConfiguration), sparkGraphConfiguration, sparkContext);
+ }
+
+ /**
+ * Load graph into Spark RDDs
+ *
+ * @param inputRDD an InputRDD instance
+ * @param sparkGraphConfiguration graph configurations
+ * @param sparkContext a JavaSparkContext instance
+ * @return vertices in Spark RDD
+ */
+ public static JavaPairRDD<Object, VertexWritable> loadVertices(
+ final InputRDD inputRDD,
+ final org.apache.commons.configuration2.Configuration sparkGraphConfiguration,
+ JavaSparkContext sparkContext) {
+ JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(sparkGraphConfiguration, sparkContext);
+ return loadedGraphRDD;
+ }
+
+ /**
+ * Create an InputRDD instance based on {@value Constants#GREMLIN_HADOOP_GRAPH_READER} config
+ * <p>
+ * If {@value Constants#GREMLIN_HADOOP_GRAPH_READER} is of {@link InputRDD} format, instantiate an instance,
+ * otherwise, instantiate an {@link InputFormatRDD} instance
+ *
+ * @param hadoopConfiguration hadoop configurations
+ * @return generated InputRDD instance
+ */
+ public static InputRDD createInputRDD(final Configuration hadoopConfiguration) {
+ final InputRDD inputRDD;
+ try {
+ inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
+ hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
+ InputFormatRDD.class.newInstance();
+ } catch (final InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ return inputRDD;
+ }
+}
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java
new file mode 100644
index 0000000..f7955aa
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.tinkerpop.gremlin.features.TestFiles;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Boxuan Li (https://li-boxuan.com)
+ */
+public class SparkRDDLoadTest extends AbstractSparkTest {
+ @Test
+ public void shouldLoadVerticesRDDWithInputRDD() {
+ final JavaSparkContext sparkContext = getSparkContext();
+
+ // load vertices with inputRDD config
+ final Configuration sparkGraphConfiguration = new BaseConfiguration();
+ sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, ExampleInputRDD.class.getCanonicalName());
+ JavaPairRDD<Object, VertexWritable> verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+ assertEquals(4, verticesRDD.count());
+ assertEquals(123, verticesRDD.values()
+ .map(vertexWritable -> (int) vertexWritable.get().values("age").next())
+ .reduce(Integer::sum).longValue());
+
+ // load vertices with inputRDD object
+ verticesRDD = SparkIOUtil.loadVertices(new ExampleInputRDD(), new BaseConfiguration(), sparkContext);
+ assertEquals(4, verticesRDD.count());
+ assertEquals(123, verticesRDD.values()
+ .map(vertexWritable -> (int) vertexWritable.get().values("age").next())
+ .reduce(Integer::sum).longValue());
+
+ sparkContext.stop();
+ }
+
+ @Test
+ public void shouldLoadVerticesRDDWithInputFormat() {
+ final JavaSparkContext sparkContext = getSparkContext();
+
+ final Configuration sparkGraphConfiguration = new BaseConfiguration();
+ sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ sparkGraphConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+ TestFiles.PATHS.get("tinkerpop-modern-v3d0.kryo"));
+
+ // load vertices
+ JavaPairRDD<Object, VertexWritable> verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+ assertEquals(6, verticesRDD.count());
+
+ sparkContext.stop();
+ }
+
+ private JavaSparkContext getSparkContext() {
+ SparkConf sparkConf = new SparkConf()
+ .setAppName("Spark Graph")
+ .set(SparkLauncher.SPARK_MASTER, "local[4]")
+ .set(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ return new JavaSparkContext(sparkConf);
+ }
+}