Merge branch '3.6-dev'
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 15bb642..817e19f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -395,6 +395,7 @@
 === TinkerPop 3.5.7 (Release Date: NOT OFFICIALLY RELEASED YET)
 
 * Fixed a memory leak in the Gremlin.Net driver that only occurred if a CancellationToken was provided.
+* Added `gremlin.spark.dontDeleteNonEmptyOutput` to stop deleting the output folder if it is not empty in `spark-gremlin`.
 
 [[release-3-5-6]]
 === TinkerPop 3.5.6 (Release Date: May 1, 2023)
diff --git a/gremlin-python/src/main/python/README.rst b/gremlin-python/src/main/python/README.rst
index a857c2b..ea6b4d8 100644
--- a/gremlin-python/src/main/python/README.rst
+++ b/gremlin-python/src/main/python/README.rst
@@ -42,16 +42,16 @@
 
     >>> from gremlin_python.process.anonymous_traversal import traversal
     >>> from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
-    >>> g = traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))
+    >>> g = traversal().with_remote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))
 
 Once "g" has been created using a connection, it is then possible to start writing Gremlin traversals to query the
 remote graph:
 
-    >>> g.V().both()[1:3].toList()
+    >>> g.V().both()[1:3].to_list()
     [v[2], v[4]]
-    >>> g.V().both()[1].toList()
+    >>> g.V().both()[1].to_list()
     [v[2]]
-    >>> g.V().both().name.toList()
+    >>> g.V().both().name.to_list()
     [lop, vadas, josh, marko, marko, josh, peter, ripple, lop, marko, josh, lop]
 
 -----------------
@@ -84,7 +84,7 @@
 
     def create_vertex(self, vid, vlabel):
         # default database cardinality is used when Cardinality argument is not specified
-        g.addV(vlabel).property(id, vid). \
+        g.add_v(vlabel).property(id, vid). \
           property(single, 'name', 'Apache'). \
           property('lastname', 'Tinkerpop'). \
           next()
@@ -95,13 +95,13 @@
 .. code:: python
 
     def list_all(self, limit=500):
-        g.V().limit(limit).elementMap().toList()
+        g.V().limit(limit).element_map().to_list()
 
     def find_vertex(self, vid):
-        g.V(vid).elementMap().next()
+        g.V(vid).element_map().next()
 
     def list_by_label_name(self, vlabel, name):
-        g.V().has(vlabel, 'name', name).elementMap().toList()
+        g.V().has(vlabel, 'name', name).element_map().to_list()
 
 Update Vertex
 ^^^^^^^^^^^^^
diff --git a/gremlin-python/src/main/python/example.py b/gremlin-python/src/main/python/example.py
index b3a3340..64e0c0b 100644
--- a/gremlin-python/src/main/python/example.py
+++ b/gremlin-python/src/main/python/example.py
@@ -32,7 +32,7 @@
     #
     # which starts it in "console" mode with an empty in-memory TinkerGraph ready to go bound to a
     # variable named "g" as referenced in the following line.
-    g = traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
+    g = traversal().with_remote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
 
     # add some data - be sure to use a terminating step like iterate() so that the traversal
     # "executes". iterate() does not return any data and is used to just generate side-effects
diff --git a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
index ed4e0ca..b2cf4ea 100644
--- a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
+++ b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
@@ -134,7 +134,7 @@
         val = True if v is None else v
         if options_strategy is None:
             options_strategy = OptionsStrategy({k: val})
-            source = self.withStrategies(options_strategy)
+            source = self.with_strategies(options_strategy)
         else:
             options_strategy[1].configuration[k] = val
 
@@ -1604,7 +1604,7 @@
 
 
 def addE(*args):
-    return __.addE(*args)
+    return __.add_e(*args)
 
 
 def add_e(*args):
@@ -1612,7 +1612,7 @@
 
 
 def addV(*args):
-    return __.addV(*args)
+    return __.add_v(*args)
 
 
 def add_v(*args):
@@ -1640,7 +1640,7 @@
 
 
 def bothE(*args):
-    return __.bothE(*args)
+    return __.both_e(*args)
 
 
 def both_e(*args):
@@ -1648,7 +1648,7 @@
 
 
 def bothV(*args):
-    return __.bothV(*args)
+    return __.both_v(*args)
 
 
 def both_v(*args):
@@ -1688,7 +1688,7 @@
 
 
 def cyclicPath(*args):
-    return __.cyclicPath(*args)
+    return __.cyclic_path(*args)
 
 
 def cyclic_path(*args):
@@ -1708,7 +1708,7 @@
 
 
 def elementMap(*args):
-    return __.elementMap(*args)
+    return __.element_map(*args)
 
 
 def element_map(*args):
@@ -1728,7 +1728,7 @@
 
 
 def flatMap(*args):
-    return __.flatMap(*args)
+    return __.flat_map(*args)
 
 
 def flat_map(*args):
@@ -1744,7 +1744,7 @@
 
 
 def groupCount(*args):
-    return __.groupCount(*args)
+    return __.group_count(*args)
 
 
 def group_count(*args):
@@ -1756,7 +1756,7 @@
 
 
 def hasId(*args):
-    return __.hasId(*args)
+    return __.has_id(*args)
 
 
 def has_id(*args):
@@ -1764,7 +1764,7 @@
 
 
 def hasKey(*args):
-    return __.hasKey(*args)
+    return __.has_key_(*args)
 
 
 def has_key_(*args):
@@ -1772,7 +1772,7 @@
 
 
 def hasLabel(*args):
-    return __.hasLabel(*args)
+    return __.has_label(*args)
 
 
 def has_label(*args):
@@ -1780,7 +1780,7 @@
 
 
 def hasNot(*args):
-    return __.hasNot(*args)
+    return __.has_not(*args)
 
 
 def has_not(*args):
@@ -1788,7 +1788,7 @@
 
 
 def hasValue(*args):
-    return __.hasValue(*args)
+    return __.has_value(*args)
 
 
 def has_value(*args):
@@ -1804,7 +1804,7 @@
 
 
 def inE(*args):
-    return __.inE(*args)
+    return __.in_e(*args)
 
 
 def in_e(*args):
@@ -1812,7 +1812,7 @@
 
 
 def inV(*args):
-    return __.inV(*args)
+    return __.in_v(*args)
 
 
 def in_v(*args):
@@ -1904,7 +1904,7 @@
 
 
 def otherV(*args):
-    return __.otherV(*args)
+    return __.other_v(*args)
 
 
 def other_v(*args):
@@ -1916,7 +1916,7 @@
 
 
 def outE(*args):
-    return __.outE(*args)
+    return __.out_e(*args)
 
 
 def out_e(*args):
@@ -1924,7 +1924,7 @@
 
 
 def outV(*args):
-    return __.outV(*args)
+    return __.out_v(*args)
 
 
 def out_v(*args):
@@ -1948,7 +1948,7 @@
 
 
 def propertyMap(*args):
-    return __.propertyMap(*args)
+    return __.property_map(*args)
 
 
 def property_map(*args):
@@ -1976,7 +1976,7 @@
 
 
 def sideEffect(*args):
-    return __.sideEffect(*args)
+    return __.side_effect(*args)
 
 
 def side_effect(*args):
@@ -1984,7 +1984,7 @@
 
 
 def simplePath(*args):
-    return __.simplePath(*args)
+    return __.simple_path(*args)
 
 
 def simple_path(*args):
@@ -2012,7 +2012,7 @@
 
 
 def timeLimit(*args):
-    return __.timeLimit(*args)
+    return __.time_limit(*args)
 
 
 def time_limit(*args):
@@ -2028,7 +2028,7 @@
 
 
 def toE(*args):
-    return __.toE(*args)
+    return __.to_e(*args)
 
 
 def to_e(*args):
@@ -2036,7 +2036,7 @@
 
 
 def toV(*args):
-    return __.toV(*args)
+    return __.to_v(*args)
 
 
 def to_v(*args):
@@ -2064,7 +2064,7 @@
 
 
 def valueMap(*args):
-    return __.valueMap(*args)
+    return __.value_map(*args)
 
 
 def value_map(*args):
diff --git a/gremlin-python/src/main/python/gremlin_python/process/traversal.py b/gremlin-python/src/main/python/gremlin_python/process/traversal.py
index a293bd5..dc259d3 100644
--- a/gremlin-python/src/main/python/gremlin_python/process/traversal.py
+++ b/gremlin-python/src/main/python/gremlin_python/process/traversal.py
@@ -77,7 +77,7 @@
     def iterate(self):
         self.bytecode.add_step("none")
         while True:
-            try: self.nextTraverser()
+            try: self.next_traverser()
             except StopIteration: return self
 
     def nextTraverser(self):
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 7519b8f..eb2f94c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -73,6 +73,7 @@
     public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
     public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
     public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache";  // don't cache the loadedGraphRDD (ignores graphStorageLevel)
+    public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
     public static final String SPARK_SERIALIZER = "spark.serializer";
     public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
     public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 62171ae..5f45583 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -315,10 +315,22 @@
 
             SparkMemory memory = null;
             // delete output location
+            final boolean dontDeleteNonEmptyOutput =
+                    graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, false);
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
             if (null != outputLocation) {
-                if (outputToHDFS && fileSystemStorage.exists(outputLocation))
-                    fileSystemStorage.rm(outputLocation);
+                if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
+                    if (dontDeleteNonEmptyOutput) {
+                        // DON'T delete the content if the folder is not empty
+                        if (fileSystemStorage.ls(outputLocation).size() == 0) {
+                            fileSystemStorage.rm(outputLocation);
+                        } else {
+                            throw new IllegalStateException("The output location '" + outputLocation + "' is not empty");
+                        }
+                    } else {
+                        fileSystemStorage.rm(outputLocation);
+                    }
+                }
                 if (outputToSpark && sparkContextStorage.exists(outputLocation))
                     sparkContextStorage.rm(outputLocation);
             }
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 6c1efb1..c428081 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -30,6 +30,7 @@
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
@@ -108,7 +109,14 @@
             put(Graph.GRAPH, HadoopGraph.class.getName());
             put(Constants.GREMLIN_HADOOP_GRAPH_READER, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
-            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
+            // clear the output location if it is not empty
+            String outputLocation = getWorkingDirectory();
+            FileSystemStorage fileSystemStorage = FileSystemStorage.open();
+            if (fileSystemStorage.ls(outputLocation).size() > 0) {
+                fileSystemStorage.rm(outputLocation);
+                outputLocation = getWorkingDirectory();
+            }
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
             put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
 
             put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);  // this makes the test suite go really fast
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
index cd57d42..3ac4369 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
@@ -94,4 +94,50 @@
         assertEquals(totalV, totalVRef);
         assertEquals(totalE, totalERef);
     }
+
+    @Test
+    public void shouldStopPurgingOfExistingNonEmptyFolder() throws Exception {
+        // Build the random graph
+        final TinkerGraph randomGraph = TinkerGraph.open();
+        final int totalVertices = 200000;
+        TestHelper.createRandomGraph(randomGraph, totalVertices, 100);
+        final String inputLocation = TestHelper.makeTestDataFile(GryoSerializerIntegrateTest.class,
+                UUID.randomUUID().toString(),
+                "random-graph.kryo");
+        randomGraph.io(IoCore.gryo()).writeGraph(inputLocation);
+        randomGraph.clear();
+        randomGraph.close();
+
+        // Serialize the graph to disk by CloneVertexProgram + SparkGraphComputer
+        final String outputLocation = TestHelper.makeTestDataDirectory(GryoSerializerIntegrateTest.class, UUID.randomUUID().toString());
+        Configuration configuration1 = getBaseConfiguration();
+        configuration1.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration1.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+        Graph graph = GraphFactory.open(configuration1);
+        graph.compute(SparkGraphComputer.class).program(CloneVertexProgram.build().create()).submit().get();
+
+        // Read the total Vertex/Edge count for golden reference through the original graph
+        Configuration configuration2 = getBaseConfiguration();
+        configuration2.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+        configuration2.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+        configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, NullOutputFormat.class.getCanonicalName());
+        configuration2.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+        configuration2.setProperty(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, true);
+        graph = GraphFactory.open(configuration2);
+        long totalVRef = graph.traversal().withComputer(SparkGraphComputer.class).V().count().next().longValue();
+
+        assertEquals(totalVRef, totalVertices);
+        // Should see exception if reuse the previous outputLocation which is not empty
+        graph = GraphFactory.open(configuration1);
+        try {
+            graph.traversal().withComputer(SparkGraphComputer.class).E().count().next().longValue();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("is not empty"));
+        }
+    }
 }