Merge branch 'dependabot/nuget/gremlin-dotnet/3.5-dev/Microsoft.NET.Test.Sdk-17.3.0' into 3.5-dev
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 22274a3..c959ef4 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,8 @@
 * Bumped to Apache `commons-configuration` 2.8.0 to fix security vulnerability.
 * Fixed issue where the `GremlinGroovyScriptEngine` reused the same translator concurrently which lead to incorrect translations.
 * Fixed bug where tasks that haven't started running yet time out due to `evaluationTimeout` and never send a response back to the client.
+* Set the exact exception in `initializationFailure` on the Java driver instead of the root cause.
+* Added `SparkIOUtil` utility to load graph into Spark RDD.
 
 [[release-3-5-4]]
 === TinkerPop 3.5.4 (Release Date: July 18, 2022)
diff --git a/docs/src/recipes/anti-patterns.asciidoc b/docs/src/recipes/anti-patterns.asciidoc
index 1341f33..839764e 100644
--- a/docs/src/recipes/anti-patterns.asciidoc
+++ b/docs/src/recipes/anti-patterns.asciidoc
@@ -263,3 +263,61 @@
 
 <1> Note, that tokens use a `fold()` reducer by default.
 <2> `by("name")` doesn't use a token, but falls into the same category as the String `"name"` is translated into an optimized traversal.
+
+[has-traversal]
+== has() and Traversal Arguments
+
+There is an understandable assumption that the `has(String,Traversal)` overload indicates that the value returned by
+the `Traversal` argument will be used as the comparative value for the specified property key. There are often similar
+assumptions that values of `P` can take a `Traversal` argument to achieve a similar end as in
+`has(String, eq(Traversal))`. Unfortunately, neither of these work as assumed.
+
+Starting with the latter issue of `P` and `Traversal` it should be noted that while `P` values take `Object` and thus
+a `Traversal` it does not mean the `Traversal` will be resolved to a result that will be comparable. `P` will rather
+do a compare on the raw `Traversal` object which of course will always return `false` (unless for some odd reason you
+happen to store that `Traversal` object in your graph):
+
+[gremlin-groovy,modern]
+----
+g.V().has('name', eq(constant('josh')))
+eq(constant('josh'))
+----
+
+As for the former issue with `has(String,Traversal)`, this requires a bit more explanation. The `Traversal` object is
+meant to be treated as a `Predicate`, meaning that if it returns a value the `has()` will allow the traverser to pass:
+
+[gremlin-groovy,modern]
+----
+g.V().has('name', constant('josh')) <1>
+g.V().has('name', constant('josh').is('xyz')) <2>
+----
+
+<1> `constant()` always returns a value so all vertices pass through the `has()`
+<2> By adding `is()` this `Traversal` will no longer return a value so no vertices pass through the `has()`
+
+These examples are a bit contrived for sake of demonstration, but the common pattern folks attempt appears as follows:
+
+[gremlin-groovy,modern]
+----
+g.withSideEffect('x',['name': 'josh']).V().has('name', select('x').select('name'))
+----
+
+The above example represents a commonly seen mistake where we try to dynamically inject the value "josh" from a
+`Map` stored in a side-effect named "x". As we can see, since `select('x').select('name')` returns a value the `has()`
+succeeds for every single vertex which is unexpected. The correct way to do this dynamic injection is with `where()`
+as in the following example:
+
+[gremlin-groovy,modern]
+----
+g.withSideEffect('x',['name': 'josh']).V().as('a').where('a',eq('x')).by('name')
+----
+
+As a final note on this topic, it's worth noting how `has(String,Traversal)` can be used. Note that the traverser that
+starts the `Traversal` argument is the `Property` value being compared. Therefore, if we wanted to find all the
+vertices that had the "name" of "josh" we would do:
+
+[gremlin-groovy,modern]
+----
+g.V().has('name', is('josh'))
+----
+
diff --git a/docs/src/reference/the-traversal.asciidoc b/docs/src/reference/the-traversal.asciidoc
index e261323..7c958e6 100644
--- a/docs/src/reference/the-traversal.asciidoc
+++ b/docs/src/reference/the-traversal.asciidoc
@@ -1471,7 +1471,8 @@
 link:++https://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversal.html#hasValue-org.apache.tinkerpop.gremlin.process.traversal.P-++[`hasValue(P)`],
 link:++https://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/process/traversal/P.html++[`P`],
 link:++https://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/process/traversal/TextP.html++[`TextP`],
-link:++https://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/structure/T.html++[`T`]
+link:++https://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/structure/T.html++[`T`],
+link:++https://tinkerpop.apache.org/docs/current/recipes/#has-traversal++[Recipes - Anti-pattern]
 
 [[id-step]]
 === Id Step
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index a57fdea..2b4f659 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -41,6 +41,11 @@
 
 === Upgrading for Users
 
+==== SparkIOUtil utility
+
+A utility class `SparkIOUtil` is introduces, which allows users to load graph into Spark RDD via
+`loadVertices(org.apache.commons.configuration2.Configuration, JavaSparkContext)` method.
+
 ==== Gremlin-Go
 
 After introducing a number of release candidates over the past several months, the official release of Gremlin-Go is now
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index ed75730..89dfcc7 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -534,14 +534,8 @@
                                 .toArray(CompletableFuture[]::new))
                         .join();
             } catch (CompletionException ex) {
-                Throwable cause = ExceptionUtils.getRootCause(ex);
-                if (null != cause) {
-                    logger.error("", cause);
-                    this.initializationFailure = cause;
-                } else {
-                    logger.error("", ex);
-                    this.initializationFailure = ex;
-                }
+                logger.error("Initialization failed", ex);
+                this.initializationFailure = ex;
             } finally {
                 hostExecutor.shutdown();
             }
@@ -559,9 +553,9 @@
         }
 
         private void throwNoHostAvailableException() {
+            final Throwable rootCause = ExceptionUtils.getRootCause(initializationFailure);
             // allow the certain exceptions to propagate as a cause
-            if (initializationFailure != null && (initializationFailure instanceof SSLException ||
-                    initializationFailure instanceof ConnectException)) {
+            if (rootCause instanceof SSLException || rootCause instanceof ConnectException) {
                 throw new NoHostAvailableException(initializationFailure);
             } else {
                 throw new NoHostAvailableException();
diff --git a/gremlin-javascript/src/main/javascript/gremlin-javascript/package-lock.json b/gremlin-javascript/src/main/javascript/gremlin-javascript/package-lock.json
index 75cde7d..b722eb6 100644
--- a/gremlin-javascript/src/main/javascript/gremlin-javascript/package-lock.json
+++ b/gremlin-javascript/src/main/javascript/gremlin-javascript/package-lock.json
@@ -2526,7 +2526,7 @@
     "require-directory": {
       "version": "2.1.1",
       "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz",
-      "integrity": "sha1-jGStX9MNqxyXbiNE/+f3kqam30I=",
+      "integrity": "sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==",
       "dev": true
     },
     "require-from-string": {
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 04945e2..e44660e 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -1862,7 +1862,7 @@
                 if (client instanceof Client.SessionedClient) {
                     assertThat(re2.getCause().getCause(), instanceOf(ConnectionException.class));
                 } else {
-                    assertThat(re2.getCause(), instanceOf(ConnectException.class));
+                    assertThat(re2.getCause().getCause().getCause(), instanceOf(ConnectException.class));
                 }
             }
 
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index d364b3d..9b7a012 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -68,6 +68,7 @@
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkIOUtil;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Direction;
@@ -282,28 +283,26 @@
                     }
                 }
             }
-            final InputRDD inputRDD;
-            final OutputRDD outputRDD;
+            final InputRDD inputRDD = SparkIOUtil.createInputRDD(hadoopConfiguration);
             final boolean filtered;
+            // if the input class can filter on load, then set the filters
+            if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
+                GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
+                filtered = false;
+            } else if (inputRDD instanceof GraphFilterAware) {
+                ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
+                filtered = false;
+            } else if (this.graphFilter.hasFilter()) {
+                filtered = true;
+            } else {
+                filtered = false;
+            }
+
+            final OutputRDD outputRDD;
             try {
-                inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
-                        hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
-                        InputFormatRDD.class.newInstance();
                 outputRDD = OutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)) ?
                         hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, OutputRDD.class, OutputRDD.class).newInstance() :
                         OutputFormatRDD.class.newInstance();
-                // if the input class can filter on load, then set the filters
-                if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
-                    GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
-                    filtered = false;
-                } else if (inputRDD instanceof GraphFilterAware) {
-                    ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
-                    filtered = false;
-                } else if (this.graphFilter.hasFilter()) {
-                    filtered = true;
-                } else {
-                    filtered = false;
-                }
             } catch (final InstantiationException | IllegalAccessException e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
@@ -331,7 +330,7 @@
                 updateLocalConfiguration(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 boolean partitioned = false;
-                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
+                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = SparkIOUtil.loadVertices(inputRDD, graphComputerConfiguration, sparkContext);
                 // if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
                 if (filtered) {
                     this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java
new file mode 100644
index 0000000..1415b75
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIOUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to load graph as Spark RDDs
+ *
+ * @author Boxuan Li (https://li-boxuan.com)
+ */
+public class SparkIOUtil {
+    private static final Logger logger = LoggerFactory.getLogger(SparkIOUtil.class);
+
+    /**
+     * Load graph into Spark RDDs. Note: Graph configurations must include {@value Constants#GREMLIN_HADOOP_GRAPH_READER}.
+     * <p>
+     * See Example below:
+     * <pre>
+     *     SparkConf sparkConf = new SparkConf().setAppName("Spark Graph")
+     *         .set(SparkLauncher.SPARK_MASTER, "local[4]")
+     *         .set(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+     *     JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
+     *
+     *     Configuration sparkGraphConfiguration = new BaseConfiguration();
+     *     sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+     *     sparkGraphConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+     *         SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern-v3d0.kryo"));
+     *
+     *     // load vertices
+     *     JavaPairRDD verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+     * </pre>
+     *
+     * @param sparkGraphConfiguration graph configurations
+     * @param sparkContext            a JavaSparkContext instance
+     * @return vertices in Spark RDD
+     */
+    public static JavaPairRDD<Object, VertexWritable> loadVertices(
+        final org.apache.commons.configuration2.Configuration sparkGraphConfiguration,
+        final JavaSparkContext sparkContext) {
+        assert sparkGraphConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_READER);
+        logger.debug("Loading vertices into Spark RDD...");
+        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(sparkGraphConfiguration);
+        return loadVertices(createInputRDD(hadoopConfiguration), sparkGraphConfiguration, sparkContext);
+    }
+
+    /**
+     * Load graph into Spark RDDs
+     *
+     * @param inputRDD                an InputRDD instance
+     * @param sparkGraphConfiguration graph configurations
+     * @param sparkContext            a JavaSparkContext instance
+     * @return vertices in Spark RDD
+     */
+    public static JavaPairRDD<Object, VertexWritable> loadVertices(
+        final InputRDD inputRDD,
+        final org.apache.commons.configuration2.Configuration sparkGraphConfiguration,
+        JavaSparkContext sparkContext) {
+        JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(sparkGraphConfiguration, sparkContext);
+        return loadedGraphRDD;
+    }
+
+    /**
+     * Create an InputRDD instance based on {@value Constants#GREMLIN_HADOOP_GRAPH_READER} config
+     * <p>
+     * If {@value Constants#GREMLIN_HADOOP_GRAPH_READER} is of {@link InputRDD} format, instantiate an instance,
+     * otherwise, instantiate an {@link InputFormatRDD} instance
+     *
+     * @param hadoopConfiguration hadoop configurations
+     * @return generated InputRDD instance
+     */
+    public static InputRDD createInputRDD(final Configuration hadoopConfiguration) {
+        final InputRDD inputRDD;
+        try {
+            inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
+                hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
+                InputFormatRDD.class.newInstance();
+        } catch (final InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        return inputRDD;
+    }
+}
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java
new file mode 100644
index 0000000..3169ec9
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkRDDLoadTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Boxuan Li (https://li-boxuan.com)
+ */
+public class SparkRDDLoadTest extends AbstractSparkTest {
+    @Test
+    public void shouldLoadVerticesRDDWithInputRDD() {
+        final JavaSparkContext sparkContext = getSparkContext();
+
+        // load vertices with inputRDD config
+        final Configuration sparkGraphConfiguration = new BaseConfiguration();
+        sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, ExampleInputRDD.class.getCanonicalName());
+        JavaPairRDD<Object, VertexWritable> verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+        assertEquals(4, verticesRDD.count());
+        assertEquals(123, verticesRDD.values()
+            .map(vertexWritable -> (int) vertexWritable.get().values("age").next())
+            .reduce(Integer::sum).longValue());
+
+        // load vertices with inputRDD object
+        verticesRDD = SparkIOUtil.loadVertices(new ExampleInputRDD(), new BaseConfiguration(), sparkContext);
+        assertEquals(4, verticesRDD.count());
+        assertEquals(123, verticesRDD.values()
+            .map(vertexWritable -> (int) vertexWritable.get().values("age").next())
+            .reduce(Integer::sum).longValue());
+
+        sparkContext.stop();
+    }
+
+    @Test
+    public void shouldLoadVerticesRDDWithInputFormat() {
+        final JavaSparkContext sparkContext = getSparkContext();
+
+        final Configuration sparkGraphConfiguration = new BaseConfiguration();
+        sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        sparkGraphConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+            SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern-v3d0.kryo"));
+
+        // load vertices
+        JavaPairRDD<Object, VertexWritable> verticesRDD = SparkIOUtil.loadVertices(sparkGraphConfiguration, sparkContext);
+        assertEquals(6, verticesRDD.count());
+
+        sparkContext.stop();
+    }
+
+    private JavaSparkContext getSparkContext() {
+        SparkConf sparkConf = new SparkConf()
+            .setAppName("Spark Graph")
+            .set(SparkLauncher.SPARK_MASTER, "local[4]")
+            .set(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+        return new JavaSparkContext(sparkConf);
+    }
+}