Merge branch '3.6-dev'
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d560b0c..d75f7ff 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -249,6 +249,7 @@
 * Fixed issue where the `GremlinGroovyScriptEngine` reused the same translator concurrently which lead to incorrect translations.
 * Fixed bug where tasks that haven't started running yet time out due to `evaluationTimeout` and never send a response back to the client.
 * Set the exact exception in `initializationFailure` on the Java driver instead of the root cause.
+* Added `SparkIOUtil` utility to load graph into Spark RDD.
 
 [[release-3-5-4]]
 === TinkerPop 3.5.4 (Release Date: July 18, 2022)
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 5b40b15..9023f14 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -41,6 +41,11 @@
 
 === Upgrading for Users
 
+==== SparkIOUtil utility
+
+A utility class `SparkIOUtil` is introduces, which allows users to load graph into Spark RDD via
+`loadVertices(org.apache.commons.configuration2.Configuration, JavaSparkContext)` method.
+
 ==== Gremlin-Go
 
 After introducing a number of release candidates over the past several months, the official release of Gremlin-Go is now
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index d364b3d..9b7a012 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -68,6 +68,7 @@
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkIOUtil;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Direction;
@@ -282,28 +283,26 @@
                     }
                 }
             }
-            final InputRDD inputRDD;
-            final OutputRDD outputRDD;
+            final InputRDD inputRDD = SparkIOUtil.createInputRDD(hadoopConfiguration);
             final boolean filtered;
+            // if the input class can filter on load, then set the filters
+            if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
+                GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
+                filtered = false;
+            } else if (inputRDD instanceof GraphFilterAware) {
+                ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
+                filtered = false;
+            } else if (this.graphFilter.hasFilter()) {
+                filtered = true;
+            } else {
+                filtered = false;
+            }
+
+            final OutputRDD outputRDD;
             try {
-                inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
-                        hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
-                        InputFormatRDD.class.newInstance();
                 outputRDD = OutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)) ?
                         hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, OutputRDD.class, OutputRDD.class).newInstance() :
                         OutputFormatRDD.class.newInstance();
-                // if the input class can filter on load, then set the filters
-                if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
-                    GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
-                    filtered = false;
-                } else if (inputRDD instanceof GraphFilterAware) {
-                    ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
-                    filtered = false;
-                } else if (this.graphFilter.hasFilter()) {
-                    filtered = true;
-                } else {
-                    filtered = false;
-                }
             } catch (final InstantiationException | IllegalAccessException e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
@@ -331,7 +330,7 @@
                 updateLocalConfiguration(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 boolean partitioned = false;
-                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
+                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = SparkIOUtil.loadVertices(inputRDD, graphComputerConfiguration, sparkContext);
                 // if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
                 if (filtered) {
                     this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java
new file mode 100644
index 0000000..1415b75
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to load graph as Spark RDDs
+ *
+ * @author Boxuan Li (https://li-boxuan.com)
+ */
+public class SparkIOUtil {
+    private static final Logger logger = LoggerFactory.getLogger(SparkIOUtil.class);
+
+    /**
+     * Load graph into Spark RDDs. Note: Graph configurations must include {@value Constants#GREMLIN_HADOOP_GRAPH_READER}.
+     * <p>
+     * See Example below:
+     * <pre>
+     *     SparkConf sparkConf = new SparkConf().setAppName("Spark Graph")
+     *         .set(SparkLauncher.SPARK_MASTER, "local[4]")
+     *         .set(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+     *     JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
+     *
+     *     Configuration sparkGraphConfiguration = new BaseConfiguration();
+     *     sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+     *     sparkGraphConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+     *         SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern-v3d0.kryo"));
+     *
+     *     // load vertices
+     *     JavaPairRDD verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+     * </pre>
+     *
+     * @param sparkGraphConfiguration graph configurations
+     * @param sparkContext            a JavaSparkContext instance
+     * @return vertices in Spark RDD
+     */
+    public static JavaPairRDD<Object, VertexWritable> loadVertices(
+        final org.apache.commons.configuration2.Configuration sparkGraphConfiguration,
+        final JavaSparkContext sparkContext) {
+        assert sparkGraphConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_READER);
+        logger.debug("Loading vertices into Spark RDD...");
+        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(sparkGraphConfiguration);
+        return loadVertices(createInputRDD(hadoopConfiguration), sparkGraphConfiguration, sparkContext);
+    }
+
+    /**
+     * Load graph into Spark RDDs
+     *
+     * @param inputRDD                an InputRDD instance
+     * @param sparkGraphConfiguration graph configurations
+     * @param sparkContext            a JavaSparkContext instance
+     * @return vertices in Spark RDD
+     */
+    public static JavaPairRDD<Object, VertexWritable> loadVertices(
+        final InputRDD inputRDD,
+        final org.apache.commons.configuration2.Configuration sparkGraphConfiguration,
+        JavaSparkContext sparkContext) {
+        JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(sparkGraphConfiguration, sparkContext);
+        return loadedGraphRDD;
+    }
+
+    /**
+     * Create an InputRDD instance based on {@value Constants#GREMLIN_HADOOP_GRAPH_READER} config
+     * <p>
+     * If {@value Constants#GREMLIN_HADOOP_GRAPH_READER} is of {@link InputRDD} format, instantiate an instance,
+     * otherwise, instantiate an {@link InputFormatRDD} instance
+     *
+     * @param hadoopConfiguration hadoop configurations
+     * @return generated InputRDD instance
+     */
+    public static InputRDD createInputRDD(final Configuration hadoopConfiguration) {
+        final InputRDD inputRDD;
+        try {
+            inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
+                hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
+                InputFormatRDD.class.newInstance();
+        } catch (final InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        return inputRDD;
+    }
+}
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java
new file mode 100644
index 0000000..f7955aa
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.tinkerpop.gremlin.features.TestFiles;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Boxuan Li (https://li-boxuan.com)
+ */
+public class SparkRDDLoadTest extends AbstractSparkTest {
+    @Test
+    public void shouldLoadVerticesRDDWithInputRDD() {
+        final JavaSparkContext sparkContext = getSparkContext();
+
+        // load vertices with inputRDD config
+        final Configuration sparkGraphConfiguration = new BaseConfiguration();
+        sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, ExampleInputRDD.class.getCanonicalName());
+        JavaPairRDD<Object, VertexWritable> verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+        assertEquals(4, verticesRDD.count());
+        assertEquals(123, verticesRDD.values()
+            .map(vertexWritable -> (int) vertexWritable.get().values("age").next())
+            .reduce(Integer::sum).longValue());
+
+        // load vertices with inputRDD object
+        verticesRDD = SparkIOUtil.loadVertices(new ExampleInputRDD(), new BaseConfiguration(), sparkContext);
+        assertEquals(4, verticesRDD.count());
+        assertEquals(123, verticesRDD.values()
+            .map(vertexWritable -> (int) vertexWritable.get().values("age").next())
+            .reduce(Integer::sum).longValue());
+
+        sparkContext.stop();
+    }
+
+    @Test
+    public void shouldLoadVerticesRDDWithInputFormat() {
+        final JavaSparkContext sparkContext = getSparkContext();
+
+        final Configuration sparkGraphConfiguration = new BaseConfiguration();
+        sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        sparkGraphConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+                TestFiles.PATHS.get("tinkerpop-modern-v3d0.kryo"));
+
+        // load vertices
+        JavaPairRDD<Object, VertexWritable> verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+        assertEquals(6, verticesRDD.count());
+
+        sparkContext.stop();
+    }
+
+    private JavaSparkContext getSparkContext() {
+        SparkConf sparkConf = new SparkConf()
+            .setAppName("Spark Graph")
+            .set(SparkLauncher.SPARK_MASTER, "local[4]")
+            .set(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+        return new JavaSparkContext(sparkConf);
+    }
+}