////
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
=== Input/Output Formats

image:adjacency-list.png[width=300,float=right] Hadoop-Gremlin provides various I/O formats -- i.e. Hadoop
`InputFormat` and `OutputFormat`. All of the formats make use of an link:http://en.wikipedia.org/wiki/Adjacency_list[adjacency list]
representation of the graph where each "row" represents a single vertex, its properties, and its incoming and
outgoing edges.

{empty} +

[[gryo-io-format]]
==== Gryo I/O Format

* **InputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat`
* **OutputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat`

<<gryo-reader-writer,Gryo>> is a binary graph format that leverages link:https://github.com/EsotericSoftware/kryo[Kryo]
to make a compact, binary representation of a vertex. It is recommended that users leverage Gryo given its space/time
savings over text-based representations.

NOTE: The `GryoInputFormat` is splittable.

[[graphson-io-format]]
==== GraphSON I/O Format

* **InputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat`
* **OutputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat`

<<graphson-reader-writer,GraphSON>> is a JSON based graph format. GraphSON is a space-expensive graph format in that
it is a text-based markup language. However, it is convenient for many developers to work with as its structure is
simple (easy to create and parse).

The data below represents an adjacency list representation of the classic TinkerGraph toy graph in GraphSON format.

[source,json]
----
{"id":1,"label":"person","outE":{"created":[{"id":9,"inV":3,"properties":{"weight":0.4}}],"knows":[{"id":7,"inV":2,"properties":{"weight":0.5}},{"id":8,"inV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":0,"value":"marko"}],"age":[{"id":1,"value":29}]}}
{"id":2,"label":"person","inE":{"knows":[{"id":7,"outV":1,"properties":{"weight":0.5}}]},"properties":{"name":[{"id":2,"value":"vadas"}],"age":[{"id":3,"value":27}]}}
{"id":3,"label":"software","inE":{"created":[{"id":9,"outV":1,"properties":{"weight":0.4}},{"id":11,"outV":4,"properties":{"weight":0.4}},{"id":12,"outV":6,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":4,"value":"lop"}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","inE":{"knows":[{"id":8,"outV":1,"properties":{"weight":1.0}}]},"outE":{"created":[{"id":10,"inV":5,"properties":{"weight":1.0}},{"id":11,"inV":3,"properties":{"weight":0.4}}]},"properties":{"name":[{"id":6,"value":"josh"}],"age":[{"id":7,"value":32}]}}
{"id":5,"label":"software","inE":{"created":[{"id":10,"outV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":8,"value":"ripple"}],"lang":[{"id":9,"value":"java"}]}}
{"id":6,"label":"person","outE":{"created":[{"id":12,"inV":3,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":10,"value":"peter"}],"age":[{"id":11,"value":35}]}}
----

[[script-io-format]]
==== Script I/O Format

* **InputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat`
* **OutputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptOutputFormat`

`ScriptInputFormat` and `ScriptOutputFormat` take an arbitrary script and use that script to either read or write
`Vertex` objects, respectively. This can be considered the most general `InputFormat`/`OutputFormat` possible in that
Hadoop-Gremlin uses the user provided script for all reading/writing.

===== ScriptInputFormat

The data below represents an adjacency list representation of the classic TinkerGraph toy graph. First line reads,
"vertex `1`, labeled `person` having 2 property values (`marko` and `29`) has 3 outgoing edges; the first edge is
labeled `knows`, connects the current vertex `1` with vertex `2` and has a property value `0.4`, and so on."

[source]
1:person:marko:29 knows:2:0.5,knows:4:1.0,created:3:0.4
2:person:vadas:27
3:project:lop:java
4:person:josh:32 created:3:0.4,created:5:1.0
5:project:ripple:java
6:person:peter:35 created:3:0.2

There is no corresponding `InputFormat` that can parse this particular file (or some adjacency list variant of it).
As such, `ScriptInputFormat` can be used. With `ScriptInputFormat` a script is stored in HDFS and leveraged by each
mapper in the Hadoop job. The script must have the following method defined:

[source,groovy]
def parse(String line) { ... }

In order to create vertices and edges, the `parse()` method gets access to a global variable named `graph`, which holds
the local `StarGraph` for the current line/vertex.

An appropriate `parse()` for the above adjacency list file is:

[source,groovy]
def parse(line) {
    def parts = line.split(/ /)
    def (id, label, name, x) = parts[0].split(/:/).toList()
    def v1 = graph.addVertex(T.id, id, T.label, label)
    if (name != null) v1.property('name', name) // first value is always the name
    if (x != null) {
        // second value depends on the vertex label; it's either
        // the age of a person or the language of a project
        if (label.equals('project')) v1.property('lang', x)
        else v1.property('age', Integer.valueOf(x))
    }
    if (parts.length == 2) {
        parts[1].split(/,/).grep { !it.isEmpty() }.each {
            def (eLabel, refId, weight) = it.split(/:/).toList()
            def v2 = graph.addVertex(T.id, refId)
            v1.addOutEdge(eLabel, v2, 'weight', Double.valueOf(weight))
        }
    }
    return v1
}

The resultant `Vertex` denotes whether the line parsed yielded a valid Vertex. As such, if the line is not valid
(e.g. a comment line, a skip line, etc.), then simply return `null`.

===== ScriptOutputFormat Support

The principle above can also be used to convert a vertex to an arbitrary `String` representation that is ultimately
streamed back to a file in HDFS. This is the role of `ScriptOutputFormat`. `ScriptOutputFormat` requires that the
provided script maintains a method with the following signature:

[source,groovy]
def stringify(Vertex vertex) { ... }

An appropriate `stringify()` to produce output in the same format that was shown in the `ScriptInputFormat` sample is:

[source,groovy]
def stringify(vertex) {
    def v = vertex.values('name', 'age', 'lang').inject(vertex.id(), vertex.label()).join(':')
    def outE = vertex.outE().map {
        def e = it.get()
        e.values('weight').inject(e.label(), e.inV().next().id()).join(':')
    }.join(',')
    return [v, outE].join('\t')
}



=== Storage Systems

Hadoop-Gremlin provides two implementations of the `Storage` API:

* `FileSystemStorage`: Access HDFS and local file system data.
* `SparkContextStorage`: Access Spark persisted RDD data.

[[interacting-with-hdfs]]
==== Interacting with HDFS

The distributed file system of Hadoop is called link:http://en.wikipedia.org/wiki/Apache_Hadoop#Hadoop_distributed_file_system[HDFS].
The results of any OLAP operation are stored in HDFS accessible via `hdfs`. For local file system access, there is `fs`.

[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
hdfs.ls()
hdfs.ls('output')
hdfs.head('output', GryoInputFormat)
hdfs.head('output', 'clusterCount', SequenceFileInputFormat)
hdfs.rm('output')
hdfs.ls()
----

[[interacting-with-spark]]
==== Interacting with Spark

If a Spark context is persisted, then Spark RDDs will remain the Spark cache and accessible over subsequent jobs.
RDDs are retrieved and saved to the `SparkContext` via `PersistedInputRDD` and `PersistedOutputRDD` respectivly.
Persisted RDDs can be accessed using `spark`.

[gremlin-groovy]
----
Spark.create('local[4]')
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.hadoop.graphWriter', PersistedOutputRDD.class.getCanonicalName())
graph.configuration().setProperty('gremlin.spark.persistContext',true)
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
spark.ls()
spark.ls('output')
spark.head('output', PersistedInputRDD)
spark.head('output', 'clusterCount', PersistedInputRDD)
spark.rm('output')
spark.ls()
----

=== A Command Line Example

image::pagerank-logo.png[width=300]

The classic link:http://en.wikipedia.org/wiki/PageRank[PageRank] centrality algorithm can be executed over the
TinkerPop graph from the command line using `GiraphGraphComputer`.

WARNING: Be sure that the `HADOOP_GREMLIN_LIBS` references the location `lib` directory of the respective
`GraphComputer` engine being used or else the requisite dependencies will not be uploaded to the Hadoop cluster.

[source,text]
----
$ hdfs dfs -copyFromLocal data/tinkerpop-modern.json tinkerpop-modern.json
$ hdfs dfs -ls
Found 2 items
-rw-r--r--   1 marko supergroup       2356 2014-07-28 13:00 /user/marko/tinkerpop-modern.json
$ hadoop jar target/giraph-gremlin-x.y.z-job.jar org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphGraphComputer ../hadoop-gremlin/conf/hadoop-graphson.properties
15/09/11 08:02:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/11 08:02:11 INFO computer.GiraphGraphComputer: HadoopGremlin(Giraph): PageRankVertexProgram[alpha=0.85,iterations=30]
15/09/11 08:02:12 INFO mapreduce.JobSubmitter: number of splits:3
15/09/11 08:02:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1441915907347_0028
15/09/11 08:02:12 INFO impl.YarnClientImpl: Submitted application application_1441915907347_0028
15/09/11 08:02:12 INFO job.GiraphJob: Tracking URL: http://markos-macbook:8088/proxy/application_1441915907347_0028/
15/09/11 08:02:12 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 3 mappers
15/09/11 08:03:54 INFO mapreduce.Job: Running job: job_1441915907347_0028
15/09/11 08:03:55 INFO mapreduce.Job: Job job_1441915907347_0028 running in uber mode : false
15/09/11 08:03:55 INFO mapreduce.Job:  map 33% reduce 0%
15/09/11 08:03:57 INFO mapreduce.Job:  map 67% reduce 0%
15/09/11 08:04:01 INFO mapreduce.Job:  map 100% reduce 0%
15/09/11 08:06:17 INFO mapreduce.Job: Job job_1441915907347_0028 completed successfully
15/09/11 08:06:17 INFO mapreduce.Job: Counters: 80
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=483918
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1465
        HDFS: Number of bytes written=1760
        HDFS: Number of read operations=39
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=20
    Job Counters
        Launched map tasks=3
        Other local map tasks=3
        Total time spent by all maps in occupied slots (ms)=458105
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=458105
        Total vcore-seconds taken by all map tasks=458105
        Total megabyte-seconds taken by all map tasks=469099520
    Map-Reduce Framework
        Map input records=3
        Map output records=0
        Input split bytes=132
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=1594
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=527958016
    Giraph Stats
        Aggregate edges=0
        Aggregate finished vertices=0
        Aggregate sent message message bytes=13535
        Aggregate sent messages=186
        Aggregate vertices=6
        Current master task partition=0
        Current workers=2
        Last checkpointed superstep=0
        Sent message bytes=438
        Sent messages=6
        Superstep=31
    Giraph Timers
        Initialize (ms)=2996
        Input superstep (ms)=5209
        Setup (ms)=59
        Shutdown (ms)=9324
        Superstep 0 GiraphComputation (ms)=3861
        Superstep 1 GiraphComputation (ms)=4027
        Superstep 10 GiraphComputation (ms)=4000
        Superstep 11 GiraphComputation (ms)=4004
        Superstep 12 GiraphComputation (ms)=3999
        Superstep 13 GiraphComputation (ms)=4000
        Superstep 14 GiraphComputation (ms)=4005
        Superstep 15 GiraphComputation (ms)=4003
        Superstep 16 GiraphComputation (ms)=4001
        Superstep 17 GiraphComputation (ms)=4007
        Superstep 18 GiraphComputation (ms)=3998
        Superstep 19 GiraphComputation (ms)=4006
        Superstep 2 GiraphComputation (ms)=4007
        Superstep 20 GiraphComputation (ms)=3996
        Superstep 21 GiraphComputation (ms)=4006
        Superstep 22 GiraphComputation (ms)=4002
        Superstep 23 GiraphComputation (ms)=3998
        Superstep 24 GiraphComputation (ms)=4003
        Superstep 25 GiraphComputation (ms)=4001
        Superstep 26 GiraphComputation (ms)=4003
        Superstep 27 GiraphComputation (ms)=4005
        Superstep 28 GiraphComputation (ms)=4002
        Superstep 29 GiraphComputation (ms)=4001
        Superstep 3 GiraphComputation (ms)=3988
        Superstep 30 GiraphComputation (ms)=4248
        Superstep 4 GiraphComputation (ms)=4010
        Superstep 5 GiraphComputation (ms)=3998
        Superstep 6 GiraphComputation (ms)=3996
        Superstep 7 GiraphComputation (ms)=4005
        Superstep 8 GiraphComputation (ms)=4009
        Superstep 9 GiraphComputation (ms)=3994
        Total (ms)=138788
    File Input Format Counters
        Bytes Read=0
    File Output Format Counters
        Bytes Written=0
$ hdfs dfs -cat output/~g/*
{"id":1,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.15000000000000002}],"name":[{"id":0,"value":"marko"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":3.0}],"age":[{"id":1,"value":29}]}}
{"id":5,"label":"software","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":35,"value":0.23181250000000003}],"name":[{"id":8,"value":"ripple"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":6,"value":0.0}],"lang":[{"id":9,"value":"java"}]}}
{"id":3,"label":"software","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.4018125}],"name":[{"id":4,"value":"lop"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":0.0}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.19250000000000003}],"name":[{"id":6,"value":"josh"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":2.0}],"age":[{"id":7,"value":32}]}}
{"id":2,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":35,"value":0.19250000000000003}],"name":[{"id":2,"value":"vadas"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":6,"value":0.0}],"age":[{"id":3,"value":27}]}}
{"id":6,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":35,"value":0.15000000000000002}],"name":[{"id":10,"value":"peter"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":6,"value":1.0}],"age":[{"id":11,"value":35}]}}
----

Vertex 4 ("josh") is isolated below:

[source,js]
----
{
  "id":4,
  "label":"person",
  "properties": {
    "gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.19250000000000003}],
    "name":[{"id":6,"value":"josh"}],
    "gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":2.0}],
    "age":[{"id":7,"value":32}]}
  }
}
----
