fix task result with path/tree can't be serialized (#1351)

Change-Id: I5f740632f49b1305785a118a27236049cc71af41
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/io/HugeGraphSONModule.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/io/HugeGraphSONModule.java
index 1a982c4..8e214ad 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/io/HugeGraphSONModule.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/io/HugeGraphSONModule.java
@@ -24,10 +24,15 @@
 import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
+import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONIo;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.TinkerPopJacksonModule;
 import org.apache.tinkerpop.shaded.jackson.core.JsonGenerator;
 import org.apache.tinkerpop.shaded.jackson.core.JsonParser;
@@ -70,10 +75,10 @@
 
     private static final long serialVersionUID = 6480426922914059122L;
 
-    public static boolean OPTIMIZE_SERIALIZE = true;
-
     private static final String TYPE_NAMESPACE = "hugegraph";
 
+    private static boolean OPTIMIZE_SERIALIZE = true;
+
     @SuppressWarnings("rawtypes")
     private static final Map<Class, String> TYPE_DEFINITIONS;
 
@@ -198,6 +203,9 @@
          */
         module.addSerializer(HugeVertex.class, new HugeVertexSerializer());
         module.addSerializer(HugeEdge.class, new HugeEdgeSerializer());
+
+        module.addSerializer(Path.class, new PathSerializer());
+        module.addSerializer(Tree.class, new TreeSerializer());
     }
 
     @SuppressWarnings("rawtypes")
@@ -485,6 +493,49 @@
         }
     }
 
+    private static class PathSerializer extends StdSerializer<Path> {
+
+        public PathSerializer() {
+            super(Path.class);
+        }
+
+        @Override
+        public void serialize(Path path, JsonGenerator jsonGenerator,
+                              SerializerProvider provider) throws IOException {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeObjectField(GraphSONTokens.LABELS,
+                                           path.labels());
+            jsonGenerator.writeObjectField(GraphSONTokens.OBJECTS,
+                                           path.objects());
+            jsonGenerator.writeEndObject();
+        }
+    }
+
+    @SuppressWarnings("rawtypes") // Tree<T>
+    private static class TreeSerializer extends StdSerializer<Tree> {
+
+        public TreeSerializer() {
+            super(Tree.class);
+        }
+
+        @Override
+        public void serialize(Tree tree, JsonGenerator jsonGenerator,
+                              SerializerProvider provider) throws IOException {
+            jsonGenerator.writeStartArray();
+            @SuppressWarnings("unchecked")
+            Set<Map.Entry<Element, Tree>> set = tree.entrySet();
+            for (Map.Entry<Element, Tree> entry : set) {
+                jsonGenerator.writeStartObject();
+                jsonGenerator.writeObjectField(GraphSONTokens.KEY,
+                                               entry.getKey());
+                jsonGenerator.writeObjectField(GraphSONTokens.VALUE,
+                                               entry.getValue());
+                jsonGenerator.writeEndObject();
+            }
+            jsonGenerator.writeEndArray();
+        }
+    }
+
     private static class ShardSerializer extends StdSerializer<Shard> {
 
         public ShardSerializer() {
@@ -493,8 +544,7 @@
 
         @Override
         public void serialize(Shard shard, JsonGenerator jsonGenerator,
-                              SerializerProvider provider)
-                              throws IOException {
+                              SerializerProvider provider) throws IOException {
             jsonGenerator.writeStartObject();
             jsonGenerator.writeStringField("start", shard.start());
             jsonGenerator.writeStringField("end", shard.end());
@@ -511,8 +561,7 @@
 
         @Override
         public void serialize(File file, JsonGenerator jsonGenerator,
-                              SerializerProvider provider)
-                              throws IOException {
+                              SerializerProvider provider) throws IOException {
             jsonGenerator.writeStartObject();
             jsonGenerator.writeStringField("file", file.getName());
             jsonGenerator.writeEndObject();
@@ -527,8 +576,7 @@
 
         @Override
         public void serialize(Blob blob, JsonGenerator jsonGenerator,
-                              SerializerProvider provider)
-                              throws IOException {
+                              SerializerProvider provider) throws IOException {
             jsonGenerator.writeBinary(blob.bytes());
         }
     }
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/TaskCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/TaskCoreTest.java
index ed4faf2..4f8a8d9 100644
--- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/TaskCoreTest.java
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/TaskCoreTest.java
@@ -255,9 +255,9 @@
                 + "schema.propertyKey('lang').asText().ifNotExist().create();"
                 + "schema.propertyKey('date').asDate().ifNotExist().create();"
                 + "schema.propertyKey('price').asInt().ifNotExist().create();"
-                + "person1=schema.vertexLabel('person1').properties('name','age').ifNotExist().create();"
-                + "person2=schema.vertexLabel('person2').properties('name','age').ifNotExist().create();"
-                + "knows=schema.edgeLabel('knows').sourceLabel('person1').targetLabel('person2').properties('date').ifNotExist().create();"
+                + "schema.vertexLabel('person1').properties('name','age').ifNotExist().create();"
+                + "schema.vertexLabel('person2').properties('name','age').ifNotExist().create();"
+                + "schema.edgeLabel('knows').sourceLabel('person1').targetLabel('person2').properties('date').ifNotExist().create();"
                 + "for(int i = 0; i < 1000; i++) {"
                 + "  p1=graph.addVertex(T.label,'person1','name','p1-'+i,'age',29);"
                 + "  p2=graph.addVertex(T.label,'person2','name','p2-'+i,'age',27);"
@@ -303,6 +303,89 @@
     }
 
     @Test
+    public void testGremlinJobWithSerializedResults() throws TimeoutException {
+        HugeGraph graph = graph();
+        TaskScheduler scheduler = graph.taskScheduler();
+
+        String script = "schema=graph.schema();"
+                + "schema.propertyKey('name').asText().ifNotExist().create();"
+                + "schema.vertexLabel('char').useCustomizeNumberId().properties('name').ifNotExist().create();"
+                + "schema.edgeLabel('next').sourceLabel('char').targetLabel('char').properties('name').ifNotExist().create();"
+                + "g.addV('char').property(id,1).property('name','A').as('a')"
+                + " .addV('char').property(id,2).property('name','B').as('b')"
+                + " .addV('char').property(id,3).property('name','C').as('c')"
+                + " .addV('char').property(id,4).property('name','D').as('d')"
+                + " .addV('char').property(id,5).property('name','E').as('e')"
+                + " .addV('char').property(id,6).property('name','F').as('f')"
+                + " .addE('next').from('a').to('b').property('name','ab')"
+                + " .addE('next').from('b').to('c').property('name','bc')"
+                + " .addE('next').from('b').to('d').property('name','bd')"
+                + " .addE('next').from('c').to('d').property('name','cd')"
+                + " .addE('next').from('c').to('e').property('name','ce')"
+                + " .addE('next').from('d').to('e').property('name','de')"
+                + " .addE('next').from('e').to('f').property('name','ef')"
+                + " .addE('next').from('f').to('d').property('name','fd')"
+                + " .iterate();"
+                + "g.tx().commit(); g.E().count();";
+
+        HugeTask<Object> task = runGremlinJob(script);
+        task = scheduler.waitUntilTaskCompleted(task.id(), 10);
+        Assert.assertEquals("test-gremlin-job", task.name());
+        Assert.assertEquals("gremlin", task.type());
+        Assert.assertEquals(TaskStatus.SUCCESS, task.status());
+        Assert.assertEquals("[8]", task.result());
+
+        Id edgeLabelId = graph.schema().getEdgeLabel("next").id();
+
+        script = "g.V(1).outE().inV().path()";
+        task = runGremlinJob(script);
+        task = scheduler.waitUntilTaskCompleted(task.id(), 10);
+        Assert.assertEquals(TaskStatus.SUCCESS, task.status());
+        String expected = String.format("[{\"labels\":[[],[],[]],\"objects\":["
+                + "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+                + "{\"id\":\"L1>%s>>L2\",\"label\":\"next\",\"type\":\"edge\",\"outV\":1,\"outVLabel\":\"char\",\"inV\":2,\"inVLabel\":\"char\",\"properties\":{\"name\":\"ab\"}},"
+                + "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}}"
+                + "]}]", edgeLabelId);
+        Assert.assertEquals(expected, task.result());
+
+        script = "g.V(1).out().out().path()";
+        task = runGremlinJob(script);
+        task = scheduler.waitUntilTaskCompleted(task.id(), 10);
+        Assert.assertEquals(TaskStatus.SUCCESS, task.status());
+        expected = "[{\"labels\":[[],[],[]],\"objects\":["
+                + "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+                + "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+                + "{\"id\":3,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"C\"}}]},"
+                + "{\"labels\":[[],[],[]],\"objects\":["
+                + "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+                + "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+                + "{\"id\":4,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"D\"}}]}]";
+        Assert.assertEquals(expected, task.result());
+
+        script = "g.V(1).outE().inV().tree()";
+        task = runGremlinJob(script);
+        task = scheduler.waitUntilTaskCompleted(task.id(), 10);
+        Assert.assertEquals(TaskStatus.SUCCESS, task.status());
+        expected = String.format("[[{\"key\":{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+                + "\"value\":["
+                + "{\"key\":{\"id\":\"L1>%s>>L2\",\"label\":\"next\",\"type\":\"edge\",\"outV\":1,\"outVLabel\":\"char\",\"inV\":2,\"inVLabel\":\"char\",\"properties\":{\"name\":\"ab\"}},"
+                + "\"value\":[{\"key\":{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},\"value\":[]}]}]}]]",
+                edgeLabelId);
+        Assert.assertEquals(expected, task.result());
+
+        script = "g.V(1).out().out().tree()";
+        task = runGremlinJob(script);
+        task = scheduler.waitUntilTaskCompleted(task.id(), 10);
+        Assert.assertEquals(TaskStatus.SUCCESS, task.status());
+        expected = "[[{\"key\":{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+                + "\"value\":[{\"key\":{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+                + "\"value\":["
+                + "{\"key\":{\"id\":3,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"C\"}},\"value\":[]},"
+                + "{\"key\":{\"id\":4,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"D\"}},\"value\":[]}]}]}]]";
+        Assert.assertEquals(expected, task.result());
+    }
+
+    @Test
     public void testGremlinJobWithFailure() throws TimeoutException {
         HugeGraph graph = graph();
         TaskScheduler scheduler = graph.taskScheduler();
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/tinkerpop/TestGraph.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/tinkerpop/TestGraph.java
index 20c2cb5..22cf614 100644
--- a/hugegraph-test/src/main/java/com/baidu/hugegraph/tinkerpop/TestGraph.java
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/tinkerpop/TestGraph.java
@@ -44,6 +44,7 @@
 import com.baidu.hugegraph.schema.PropertyKey;
 import com.baidu.hugegraph.schema.SchemaManager;
 import com.baidu.hugegraph.task.TaskScheduler;
+import com.baidu.hugegraph.testutil.Whitebox;
 import com.baidu.hugegraph.type.define.IdStrategy;
 import com.baidu.hugegraph.type.define.NodeRole;
 import com.google.common.collect.ImmutableSet;
@@ -251,7 +252,8 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public <I extends Io> I io(final Io.Builder<I> builder) {
-        HugeGraphSONModule.OPTIMIZE_SERIALIZE = false;
+        Whitebox.setInternalState(HugeGraphSONModule.class,
+                                  "OPTIMIZE_SERIALIZE", false);
         return (I) builder.graph(this).onMapper(mapper ->
             mapper.addRegistry(HugeGraphIoRegistry.instance())
         ).create();