Merge pull request #2053 from ministat/hj_TINKERPOP-2941

[TINKERPOP-2941] Stop deleting the output location if it is not empty
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index cd68844..530b05f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -24,6 +24,7 @@
 === TinkerPop 3.5.7 (Release Date: NOT OFFICIALLY RELEASED YET)
 
 * Fixed a memory leak in the Gremlin.Net driver that only occurred if a CancellationToken was provided.
+* Added `gremlin.spark.dontDeleteNonEmptyOutput` to stop deleting the output folder if it is not empty in `spark-gremlin`.
 
 [[release-3-5-6]]
 === TinkerPop 3.5.6 (Release Date: May 1, 2023)
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 7519b8f..eb2f94c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -73,6 +73,7 @@
     public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
     public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
     public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache";  // don't cache the loadedGraphRDD (ignores graphStorageLevel)
+    public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
     public static final String SPARK_SERIALIZER = "spark.serializer";
     public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
     public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 62171ae..5f45583 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -315,10 +315,22 @@
 
             SparkMemory memory = null;
             // delete output location
+            final boolean dontDeleteNonEmptyOutput =
+                    graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, false);
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
             if (null != outputLocation) {
-                if (outputToHDFS && fileSystemStorage.exists(outputLocation))
-                    fileSystemStorage.rm(outputLocation);
+                if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
+                    if (dontDeleteNonEmptyOutput) {
+                        // DON'T delete the content if the folder is not empty
+                        if (fileSystemStorage.ls(outputLocation).size() == 0) {
+                            fileSystemStorage.rm(outputLocation);
+                        } else {
+                            throw new IllegalStateException("The output location '" + outputLocation + "' is not empty");
+                        }
+                    } else {
+                        fileSystemStorage.rm(outputLocation);
+                    }
+                }
                 if (outputToSpark && sparkContextStorage.exists(outputLocation))
                     sparkContextStorage.rm(outputLocation);
             }
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 6c1efb1..c428081 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -30,6 +30,7 @@
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
@@ -108,7 +109,14 @@
             put(Graph.GRAPH, HadoopGraph.class.getName());
             put(Constants.GREMLIN_HADOOP_GRAPH_READER, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
-            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
+            // clear the output location if it is not empty
+            String outputLocation = getWorkingDirectory();
+            FileSystemStorage fileSystemStorage = FileSystemStorage.open();
+            if (fileSystemStorage.ls(outputLocation).size() > 0) {
+                fileSystemStorage.rm(outputLocation);
+                outputLocation = getWorkingDirectory();
+            }
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
             put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
 
             put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);  // this makes the test suite go really fast
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
index cd57d42..3ac4369 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
@@ -94,4 +94,50 @@
         assertEquals(totalV, totalVRef);
         assertEquals(totalE, totalERef);
     }
+
+    @Test
+    public void shouldStopPurgingOfExistingNonEmptyFolder() throws Exception {
+        // Build the random graph
+        final TinkerGraph randomGraph = TinkerGraph.open();
+        final int totalVertices = 200000;
+        TestHelper.createRandomGraph(randomGraph, totalVertices, 100);
+        final String inputLocation = TestHelper.makeTestDataFile(GryoSerializerIntegrateTest.class,
+                UUID.randomUUID().toString(),
+                "random-graph.kryo");
+        randomGraph.io(IoCore.gryo()).writeGraph(inputLocation);
+        randomGraph.clear();
+        randomGraph.close();
+
+        // Serialize the graph to disk by CloneVertexProgram + SparkGraphComputer
+        final String outputLocation = TestHelper.makeTestDataDirectory(GryoSerializerIntegrateTest.class, UUID.randomUUID().toString());
+        Configuration configuration1 = getBaseConfiguration();
+        configuration1.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+        configuration1.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration1.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+        Graph graph = GraphFactory.open(configuration1);
+        graph.compute(SparkGraphComputer.class).program(CloneVertexProgram.build().create()).submit().get();
+
+        // Read the total Vertex/Edge count for golden reference through the original graph
+        Configuration configuration2 = getBaseConfiguration();
+        configuration2.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+        configuration2.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+        configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, NullOutputFormat.class.getCanonicalName());
+        configuration2.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+        configuration2.setProperty(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, true);
+        graph = GraphFactory.open(configuration2);
+        long totalVRef = graph.traversal().withComputer(SparkGraphComputer.class).V().count().next().longValue();
+
+        assertEquals(totalVRef, totalVertices);
+        // Should see exception if reuse the previous outputLocation which is not empty
+        graph = GraphFactory.open(configuration1);
+        try {
+            graph.traversal().withComputer(SparkGraphComputer.class).E().count().next().longValue();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("is not empty"));
+        }
+    }
 }