Merge pull request #2053 from ministat/hj_TINKERPOP-2941
[TINKERPOP-2941] Stop deleting the output location if it is not empty
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index cd68844..530b05f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -24,6 +24,7 @@
=== TinkerPop 3.5.7 (Release Date: NOT OFFICIALLY RELEASED YET)
* Fixed a memory leak in the Gremlin.Net driver that only occurred if a CancellationToken was provided.
+* Added `gremlin.spark.dontDeleteNonEmptyOutput` to stop deleting the output folder if it is not empty in `spark-gremlin`.
[[release-3-5-6]]
=== TinkerPop 3.5.6 (Release Date: May 1, 2023)
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 7519b8f..eb2f94c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -73,6 +73,7 @@
public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache"; // don't cache the loadedGraphRDD (ignores graphStorageLevel)
+ public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 62171ae..5f45583 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -315,10 +315,22 @@
SparkMemory memory = null;
// delete output location
+ final boolean dontDeleteNonEmptyOutput =
+ graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, false);
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
if (null != outputLocation) {
- if (outputToHDFS && fileSystemStorage.exists(outputLocation))
- fileSystemStorage.rm(outputLocation);
+ if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
+ if (dontDeleteNonEmptyOutput) {
+ // DON'T delete the content if the folder is not empty
+ if (fileSystemStorage.ls(outputLocation).size() == 0) {
+ fileSystemStorage.rm(outputLocation);
+ } else {
+ throw new IllegalStateException("The output location '" + outputLocation + "' is not empty");
+ }
+ } else {
+ fileSystemStorage.rm(outputLocation);
+ }
+ }
if (outputToSpark && sparkContextStorage.exists(outputLocation))
sparkContextStorage.rm(outputLocation);
}
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 6c1efb1..c428081 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -30,6 +30,7 @@
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
@@ -108,7 +109,14 @@
put(Graph.GRAPH, HadoopGraph.class.getName());
put(Constants.GREMLIN_HADOOP_GRAPH_READER, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
- put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
+ // clear the output location if it is not empty
+ String outputLocation = getWorkingDirectory();
+ FileSystemStorage fileSystemStorage = FileSystemStorage.open();
+ if (fileSystemStorage.ls(outputLocation).size() > 0) {
+ fileSystemStorage.rm(outputLocation);
+ outputLocation = getWorkingDirectory();
+ }
+ put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
index cd57d42..3ac4369 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
@@ -94,4 +94,50 @@
assertEquals(totalV, totalVRef);
assertEquals(totalE, totalERef);
}
+
+ @Test
+ public void shouldStopPurgingOfExistingNonEmptyFolder() throws Exception {
+ // Build the random graph
+ final TinkerGraph randomGraph = TinkerGraph.open();
+ final int totalVertices = 200000;
+ TestHelper.createRandomGraph(randomGraph, totalVertices, 100);
+ final String inputLocation = TestHelper.makeTestDataFile(GryoSerializerIntegrateTest.class,
+ UUID.randomUUID().toString(),
+ "random-graph.kryo");
+ randomGraph.io(IoCore.gryo()).writeGraph(inputLocation);
+ randomGraph.clear();
+ randomGraph.close();
+
+ // Serialize the graph to disk by CloneVertexProgram + SparkGraphComputer
+ final String outputLocation = TestHelper.makeTestDataDirectory(GryoSerializerIntegrateTest.class, UUID.randomUUID().toString());
+ Configuration configuration1 = getBaseConfiguration();
+ configuration1.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+ configuration1.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+ Graph graph = GraphFactory.open(configuration1);
+ graph.compute(SparkGraphComputer.class).program(CloneVertexProgram.build().create()).submit().get();
+
+ // Read the total Vertex/Edge count for golden reference through the original graph
+ Configuration configuration2 = getBaseConfiguration();
+ configuration2.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+ configuration2.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+ configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, NullOutputFormat.class.getCanonicalName());
+ configuration2.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+ configuration2.setProperty(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, true);
+ graph = GraphFactory.open(configuration2);
+ long totalVRef = graph.traversal().withComputer(SparkGraphComputer.class).V().count().next().longValue();
+
+ assertEquals(totalVRef, totalVertices);
+ // Should see exception if reuse the previous outputLocation which is not empty
+ graph = GraphFactory.open(configuration1);
+ try {
+ graph.traversal().withComputer(SparkGraphComputer.class).E().count().next().longValue();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("is not empty"));
+ }
+ }
}