Merge branch '3.6-dev'
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 15bb642..817e19f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -395,6 +395,7 @@
=== TinkerPop 3.5.7 (Release Date: NOT OFFICIALLY RELEASED YET)
* Fixed a memory leak in the Gremlin.Net driver that only occurred if a CancellationToken was provided.
+* Added `gremlin.spark.dontDeleteNonEmptyOutput` to stop deleting the output folder if it is not empty in `spark-gremlin`.
[[release-3-5-6]]
=== TinkerPop 3.5.6 (Release Date: May 1, 2023)
diff --git a/gremlin-python/src/main/python/README.rst b/gremlin-python/src/main/python/README.rst
index a857c2b..ea6b4d8 100644
--- a/gremlin-python/src/main/python/README.rst
+++ b/gremlin-python/src/main/python/README.rst
@@ -42,16 +42,16 @@
>>> from gremlin_python.process.anonymous_traversal import traversal
>>> from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
- >>> g = traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))
+ >>> g = traversal().with_remote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))
Once "g" has been created using a connection, it is then possible to start writing Gremlin traversals to query the
remote graph:
- >>> g.V().both()[1:3].toList()
+ >>> g.V().both()[1:3].to_list()
[v[2], v[4]]
- >>> g.V().both()[1].toList()
+ >>> g.V().both()[1].to_list()
[v[2]]
- >>> g.V().both().name.toList()
+ >>> g.V().both().name.to_list()
[lop, vadas, josh, marko, marko, josh, peter, ripple, lop, marko, josh, lop]
-----------------
@@ -84,7 +84,7 @@
def create_vertex(self, vid, vlabel):
# default database cardinality is used when Cardinality argument is not specified
- g.addV(vlabel).property(id, vid). \
+ g.add_v(vlabel).property(id, vid). \
property(single, 'name', 'Apache'). \
property('lastname', 'Tinkerpop'). \
next()
@@ -95,13 +95,13 @@
.. code:: python
def list_all(self, limit=500):
- g.V().limit(limit).elementMap().toList()
+ g.V().limit(limit).element_map().to_list()
def find_vertex(self, vid):
- g.V(vid).elementMap().next()
+ g.V(vid).element_map().next()
def list_by_label_name(self, vlabel, name):
- g.V().has(vlabel, 'name', name).elementMap().toList()
+ g.V().has(vlabel, 'name', name).element_map().to_list()
Update Vertex
^^^^^^^^^^^^^
diff --git a/gremlin-python/src/main/python/example.py b/gremlin-python/src/main/python/example.py
index b3a3340..64e0c0b 100644
--- a/gremlin-python/src/main/python/example.py
+++ b/gremlin-python/src/main/python/example.py
@@ -32,7 +32,7 @@
#
# which starts it in "console" mode with an empty in-memory TinkerGraph ready to go bound to a
# variable named "g" as referenced in the following line.
- g = traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
+ g = traversal().with_remote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
# add some data - be sure to use a terminating step like iterate() so that the traversal
# "executes". iterate() does not return any data and is used to just generate side-effects
diff --git a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
index ed4e0ca..b2cf4ea 100644
--- a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
+++ b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
@@ -134,7 +134,7 @@
val = True if v is None else v
if options_strategy is None:
options_strategy = OptionsStrategy({k: val})
- source = self.withStrategies(options_strategy)
+ source = self.with_strategies(options_strategy)
else:
options_strategy[1].configuration[k] = val
@@ -1604,7 +1604,7 @@
def addE(*args):
- return __.addE(*args)
+ return __.add_e(*args)
def add_e(*args):
@@ -1612,7 +1612,7 @@
def addV(*args):
- return __.addV(*args)
+ return __.add_v(*args)
def add_v(*args):
@@ -1640,7 +1640,7 @@
def bothE(*args):
- return __.bothE(*args)
+ return __.both_e(*args)
def both_e(*args):
@@ -1648,7 +1648,7 @@
def bothV(*args):
- return __.bothV(*args)
+ return __.both_v(*args)
def both_v(*args):
@@ -1688,7 +1688,7 @@
def cyclicPath(*args):
- return __.cyclicPath(*args)
+ return __.cyclic_path(*args)
def cyclic_path(*args):
@@ -1708,7 +1708,7 @@
def elementMap(*args):
- return __.elementMap(*args)
+ return __.element_map(*args)
def element_map(*args):
@@ -1728,7 +1728,7 @@
def flatMap(*args):
- return __.flatMap(*args)
+ return __.flat_map(*args)
def flat_map(*args):
@@ -1744,7 +1744,7 @@
def groupCount(*args):
- return __.groupCount(*args)
+ return __.group_count(*args)
def group_count(*args):
@@ -1756,7 +1756,7 @@
def hasId(*args):
- return __.hasId(*args)
+ return __.has_id(*args)
def has_id(*args):
@@ -1764,7 +1764,7 @@
def hasKey(*args):
- return __.hasKey(*args)
+ return __.has_key_(*args)
def has_key_(*args):
@@ -1772,7 +1772,7 @@
def hasLabel(*args):
- return __.hasLabel(*args)
+ return __.has_label(*args)
def has_label(*args):
@@ -1780,7 +1780,7 @@
def hasNot(*args):
- return __.hasNot(*args)
+ return __.has_not(*args)
def has_not(*args):
@@ -1788,7 +1788,7 @@
def hasValue(*args):
- return __.hasValue(*args)
+ return __.has_value(*args)
def has_value(*args):
@@ -1804,7 +1804,7 @@
def inE(*args):
- return __.inE(*args)
+ return __.in_e(*args)
def in_e(*args):
@@ -1812,7 +1812,7 @@
def inV(*args):
- return __.inV(*args)
+ return __.in_v(*args)
def in_v(*args):
@@ -1904,7 +1904,7 @@
def otherV(*args):
- return __.otherV(*args)
+ return __.other_v(*args)
def other_v(*args):
@@ -1916,7 +1916,7 @@
def outE(*args):
- return __.outE(*args)
+ return __.out_e(*args)
def out_e(*args):
@@ -1924,7 +1924,7 @@
def outV(*args):
- return __.outV(*args)
+ return __.out_v(*args)
def out_v(*args):
@@ -1948,7 +1948,7 @@
def propertyMap(*args):
- return __.propertyMap(*args)
+ return __.property_map(*args)
def property_map(*args):
@@ -1976,7 +1976,7 @@
def sideEffect(*args):
- return __.sideEffect(*args)
+ return __.side_effect(*args)
def side_effect(*args):
@@ -1984,7 +1984,7 @@
def simplePath(*args):
- return __.simplePath(*args)
+ return __.simple_path(*args)
def simple_path(*args):
@@ -2012,7 +2012,7 @@
def timeLimit(*args):
- return __.timeLimit(*args)
+ return __.time_limit(*args)
def time_limit(*args):
@@ -2028,7 +2028,7 @@
def toE(*args):
- return __.toE(*args)
+ return __.to_e(*args)
def to_e(*args):
@@ -2036,7 +2036,7 @@
def toV(*args):
- return __.toV(*args)
+ return __.to_v(*args)
def to_v(*args):
@@ -2064,7 +2064,7 @@
def valueMap(*args):
- return __.valueMap(*args)
+ return __.value_map(*args)
def value_map(*args):
diff --git a/gremlin-python/src/main/python/gremlin_python/process/traversal.py b/gremlin-python/src/main/python/gremlin_python/process/traversal.py
index a293bd5..dc259d3 100644
--- a/gremlin-python/src/main/python/gremlin_python/process/traversal.py
+++ b/gremlin-python/src/main/python/gremlin_python/process/traversal.py
@@ -77,7 +77,7 @@
def iterate(self):
self.bytecode.add_step("none")
while True:
- try: self.nextTraverser()
+ try: self.next_traverser()
except StopIteration: return self
def nextTraverser(self):
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 7519b8f..eb2f94c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -73,6 +73,7 @@
public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache"; // don't cache the loadedGraphRDD (ignores graphStorageLevel)
+ public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 62171ae..5f45583 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -315,10 +315,22 @@
SparkMemory memory = null;
// delete output location
+ final boolean dontDeleteNonEmptyOutput =
+ graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, false);
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
if (null != outputLocation) {
- if (outputToHDFS && fileSystemStorage.exists(outputLocation))
- fileSystemStorage.rm(outputLocation);
+ if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
+ if (dontDeleteNonEmptyOutput) {
+ // DON'T delete the content if the folder is not empty
+ if (fileSystemStorage.ls(outputLocation).size() == 0) {
+ fileSystemStorage.rm(outputLocation);
+ } else {
+ throw new IllegalStateException("The output location '" + outputLocation + "' is not empty");
+ }
+ } else {
+ fileSystemStorage.rm(outputLocation);
+ }
+ }
if (outputToSpark && sparkContextStorage.exists(outputLocation))
sparkContextStorage.rm(outputLocation);
}
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 6c1efb1..c428081 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -30,6 +30,7 @@
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
@@ -108,7 +109,14 @@
put(Graph.GRAPH, HadoopGraph.class.getName());
put(Constants.GREMLIN_HADOOP_GRAPH_READER, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
put(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
- put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
+ // clear the output location if it is not empty
+ String outputLocation = getWorkingDirectory();
+ FileSystemStorage fileSystemStorage = FileSystemStorage.open();
+ if (fileSystemStorage.ls(outputLocation).size() > 0) {
+ fileSystemStorage.rm(outputLocation);
+ outputLocation = getWorkingDirectory();
+ }
+ put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
index cd57d42..3ac4369 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCloneVertexProgramInterceptorTest.java
@@ -94,4 +94,50 @@
assertEquals(totalV, totalVRef);
assertEquals(totalE, totalERef);
}
+
+ @Test
+ public void shouldStopPurgingOfExistingNonEmptyFolder() throws Exception {
+ // Build the random graph
+ final TinkerGraph randomGraph = TinkerGraph.open();
+ final int totalVertices = 200000;
+ TestHelper.createRandomGraph(randomGraph, totalVertices, 100);
+ final String inputLocation = TestHelper.makeTestDataFile(GryoSerializerIntegrateTest.class,
+ UUID.randomUUID().toString(),
+ "random-graph.kryo");
+ randomGraph.io(IoCore.gryo()).writeGraph(inputLocation);
+ randomGraph.clear();
+ randomGraph.close();
+
+ // Serialize the graph to disk by CloneVertexProgram + SparkGraphComputer
+ final String outputLocation = TestHelper.makeTestDataDirectory(GryoSerializerIntegrateTest.class, UUID.randomUUID().toString());
+ Configuration configuration1 = getBaseConfiguration();
+ configuration1.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName());
+ configuration1.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+ configuration1.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+ Graph graph = GraphFactory.open(configuration1);
+ graph.compute(SparkGraphComputer.class).program(CloneVertexProgram.build().create()).submit().get();
+
+ // Read the total Vertex/Edge count for golden reference through the original graph
+ Configuration configuration2 = getBaseConfiguration();
+ configuration2.clearProperty(Constants.SPARK_SERIALIZER); // ensure proper default to GryoSerializer
+ configuration2.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+ configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration2.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, NullOutputFormat.class.getCanonicalName());
+ configuration2.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+ configuration2.setProperty(Constants.GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT, true);
+ graph = GraphFactory.open(configuration2);
+ long totalVRef = graph.traversal().withComputer(SparkGraphComputer.class).V().count().next().longValue();
+
+ assertEquals(totalVRef, totalVertices);
+ // Should see exception if reuse the previous outputLocation which is not empty
+ graph = GraphFactory.open(configuration1);
+ try {
+ graph.traversal().withComputer(SparkGraphComputer.class).E().count().next().longValue();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("is not empty"));
+ }
+ }
}