Add the GraphJob example, which is PageRank, to work on YARN cluster.

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1701975 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 637ad91..dede15f 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -42,6 +42,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-graph</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
     </dependency>
diff --git a/yarn/src/main/java/org/apache/hama/bsp/PageRankonYarn.java b/yarn/src/main/java/org/apache/hama/bsp/PageRankonYarn.java
new file mode 100644
index 0000000..9db6e56
--- /dev/null
+++ b/yarn/src/main/java/org/apache/hama/bsp/PageRankonYarn.java
@@ -0,0 +1,217 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.*;
+import org.json.simple.JSONArray;
+import org.json.simple.parser.JSONParser;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class PageRankonYarn {
+  public static class PageRankVertex extends
+      Vertex<Text, NullWritable, DoubleWritable> {
+
+    static double DAMPING_FACTOR = 0.85;
+    static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
+
+    @Override
+    public void setup(HamaConfiguration conf) {
+      String val = conf.get("hama.pagerank.alpha");
+      if (val != null) {
+        DAMPING_FACTOR = Double.parseDouble(val);
+      }
+      val = conf.get("hama.graph.max.convergence.error");
+      if (val != null) {
+        MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
+      }
+
+      // initialize this vertex to 1 / count of global vertices in this graph
+      setValue(new DoubleWritable(1.0 / getTotalNumVertices()));
+    }
+
+    @Override
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+      if (this.getSuperstepCount() >= 1) {
+        double sum = 0;
+        for (DoubleWritable msg : messages) {
+          sum += msg.get();
+        }
+        double alpha = (1.0d - DAMPING_FACTOR) / getTotalNumVertices();
+        setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+        aggregate(0, this.getValue());
+      }
+
+      // if we have not reached our global error yet, then proceed.
+      DoubleWritable globalError = getAggregatedValue(0);
+
+      if (globalError != null && this.getSuperstepCount() > 2
+          && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+        voteToHalt();
+      } else {
+        // in each superstep we are going to send a new rank to our neighbours
+        sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
+            / this.getEdges().size()));
+      }
+    }
+  }
+
+  public static class PagerankTextReader extends
+      VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+
+      String[] tokenArray = value.toString().split("\t");
+      String vtx = tokenArray[0].trim();
+      String[] edges = tokenArray[1].trim().split(" ");
+
+      vertex.setVertexID(new Text(vtx));
+
+      for (String v : edges) {
+        vertex.addEdge(new Edge<Text, NullWritable>(new Text(v), null));
+      }
+
+      return true;
+    }
+  }
+
+  public static class PagerankJsonReader extends
+      VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+      JSONArray jsonArray = (JSONArray) new JSONParser()
+          .parse(value.toString());
+
+      vertex.setVertexID(new Text(jsonArray.get(0).toString()));
+
+      Iterator<JSONArray> iter = ((JSONArray) jsonArray.get(2)).iterator();
+      while (iter.hasNext()) {
+        JSONArray edge = iter.next();
+        vertex.addEdge(new Edge<Text, NullWritable>(new Text(edge.get(0)
+            .toString()), null));
+      }
+
+      return true;
+    }
+  }
+
+  public static YARNGraphJob createJob(String[] args, HamaConfiguration conf,
+      Options opts) throws IOException, ParseException {
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (!cliParser.hasOption("i") || !cliParser.hasOption("o")) {
+      System.out
+          .println("No input or output path specified for PageRank, exiting.");
+    }
+
+    YARNGraphJob pageJob = new YARNGraphJob(conf, PageRankonYarn.class);
+    pageJob.setJobName("Pagerank");
+
+    pageJob.setVertexClass(PageRankVertex.class);
+    pageJob.setInputPath(new Path(cliParser.getOptionValue("i")));
+    pageJob.setOutputPath(new Path(cliParser.getOptionValue("o")));
+
+    // set the defaults
+    pageJob.setMaxIteration(30);
+    pageJob.set("hama.pagerank.alpha", "0.85");
+    // reference vertices to itself, because we don't have a dangling node
+    // contribution here
+    pageJob.set("hama.graph.self.ref", "true");
+    pageJob.set("hama.graph.max.convergence.error", "0.001");
+
+    if (cliParser.hasOption("t")) {
+      pageJob.setNumBspTask(Integer.parseInt(cliParser.getOptionValue("t")));
+    }
+
+    // error
+    pageJob.setAggregatorClass(AverageAggregator.class);
+
+    // Vertex reader
+    // According to file type, which is Text or Json,
+    // Vertex reader handle it differently.
+    if (cliParser.hasOption("f")) {
+      if (cliParser.getOptionValue("f").equals("text")) {
+        pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+      } else if (cliParser.getOptionValue("f").equals("json")) {
+        pageJob.setVertexInputReaderClass(PagerankJsonReader.class);
+      } else {
+        System.out.println("File type is not available to run Pagerank... "
+            + "File type set default value, Text.");
+        pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+      }
+    } else {
+      pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+    }
+
+    pageJob.setVertexIDClass(Text.class);
+    pageJob.setVertexValueClass(DoubleWritable.class);
+    pageJob.setEdgeValueClass(NullWritable.class);
+
+    pageJob.setInputFormat(TextInputFormat.class);
+    pageJob.setInputKeyClass(LongWritable.class);
+    pageJob.setInputValueClass(Text.class);
+
+    pageJob.setPartitioner(HashPartitioner.class);
+    pageJob.setOutputFormat(TextOutputFormat.class);
+    pageJob.setOutputKeyClass(Text.class);
+    pageJob.setOutputValueClass(DoubleWritable.class);
+    return pageJob;
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException, ParseException {
+    Options opts = new Options();
+    opts.addOption("i", "input_path", true, "The Location of output path.");
+    opts.addOption("o", "output_path", true, "The Location of input path.");
+    opts.addOption("h", "help", false, "Print usage");
+    opts.addOption("t", "task_num", true, "The number of tasks.");
+    opts.addOption("f", "file_type", true, "The file type of input data. Input"
+        + "file format which is \"text\" tab delimiter separated or \"json\"."
+        + "Default value - Text");
+
+    if (args.length < 2) {
+      new HelpFormatter().printHelp("pagerank -i INPUT_PATH -o OUTPUT_PATH "
+          + "[-t NUM_TASKS] [-f FILE_TYPE]", opts);
+      System.exit(-1);
+    }
+
+    HamaConfiguration conf = new HamaConfiguration();
+    YARNGraphJob pageJob = createJob(args, conf, opts);
+
+    long startTime = System.currentTimeMillis();
+    if (pageJob.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNGraphJob.java b/yarn/src/main/java/org/apache/hama/bsp/YARNGraphJob.java
new file mode 100644
index 0000000..483f912
--- /dev/null
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNGraphJob.java
@@ -0,0 +1,196 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.graph.*;
+
+import java.io.IOException;
+
+public class YARNGraphJob extends YARNBSPJob {
+  public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
+  public final static String VERTEX_ID_CLASS_ATTR = "hama.graph.vertex.id.class";
+  public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class";
+  public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
+
+  public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR = "hama.graph.vertex.output.writer.class";
+  public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
+
+  public YARNGraphJob(HamaConfiguration conf, Class<?> exampleClass)
+      throws IOException {
+    super(conf);
+    conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+        OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
+    conf.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
+    conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
+    conf.setBoolean("hama.use.unsafeserialization", true);
+
+    this.setBspClass(GraphJobRunner.class);
+    this.setJarByClass(exampleClass);
+    this.setVertexIDClass(Text.class);
+    this.setVertexValueClass(IntWritable.class);
+    this.setEdgeValueClass(IntWritable.class);
+    this.setPartitioner(HashPartitioner.class);
+    this.setMessageQueueBehaviour(MessageQueue.PERSISTENT_QUEUE);
+  }
+
+    /**
+     * Set the Vertex class for the job.
+     */
+
+  public void setVertexClass(Class<? extends
+      Vertex<? extends Writable, ? extends Writable, ? extends Writable>> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
+    setInputKeyClass(cls);
+    setInputValueClass(NullWritable.class);
+  }
+
+  /**
+   * Set the Vertex ID class for the job.
+   */
+  public void setVertexIDClass(Class<? extends Writable> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
+  }
+
+  /**
+   * Set the Vertex value class for the job.
+   */
+  public void setVertexValueClass(Class<? extends Writable> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_VALUE_CLASS_ATTR, cls, Writable.class);
+  }
+
+  /**
+   * Set the Edge value class for the job.
+   */
+  public void setEdgeValueClass(Class<? extends Writable> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_EDGE_VALUE_CLASS_ATTR, cls, Writable.class);
+  }
+
+  /**
+   * Set the aggregator for the job.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void setAggregatorClass(Class<? extends Aggregator> cls) {
+    this.setAggregatorClass(new Class[] { cls });
+  }
+
+  /**
+   * Sets multiple aggregators for the job.
+   */
+  @SuppressWarnings("rawtypes")
+  public void setAggregatorClass(Class<? extends Aggregator>... cls) {
+    String classNames = "";
+    for (Class<? extends Aggregator> cl : cls) {
+      classNames += cl.getName() + ";";
+    }
+    conf.set(AGGREGATOR_CLASS_ATTR, classNames);
+  }
+
+  /**
+   * Sets the input reader for parsing the input to vertices.
+   */
+  public void setVertexInputReaderClass(
+      @SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
+    ensureState(JobState.DEFINE);
+    conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
+        PartitioningRunner.RecordConverter.class);
+  }
+
+  /**
+   * Sets the output writer for materializing vertices to the output sink. If
+   * not set, the default DefaultVertexOutputWriter will be used.
+   */
+  public void setVertexOutputWriterClass(
+      @SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
+    ensureState(JobState.DEFINE);
+    conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
+        VertexOutputWriter.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
+    return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
+        .getClass(VERTEX_CLASS_ATTR, Vertex.class);
+  }
+
+  @Override
+  public void setPartitioner(
+      @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
+    super.setPartitioner(theClass);
+  }
+
+  @Override
+  public void setCombinerClass(
+      Class<? extends Combiner<? extends Writable>> cls) {
+    ensureState(JobState.DEFINE);
+    conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
+  }
+
+  /**
+   * Sets how many iterations the algorithm should perform, -1 for deactivated
+   * is default value.
+   */
+  public void setMaxIteration(int maxIteration) {
+    conf.setInt("hama.graph.max.iteration", maxIteration);
+  }
+
+  @Override
+  public void submit() throws IOException, InterruptedException {
+    Preconditions
+        .checkArgument(this.getConfiguration().get(VERTEX_CLASS_ATTR) != null,
+            "Please provide a vertex class!");
+    Preconditions.checkArgument(
+        this.getConfiguration().get(VERTEX_ID_CLASS_ATTR) != null,
+        "Please provide an vertex ID class!");
+    Preconditions.checkArgument(
+        this.getConfiguration().get(VERTEX_VALUE_CLASS_ATTR) != null,
+        "Please provide an vertex value class, if you don't need one, use NullWritable!");
+    Preconditions.checkArgument(
+        this.getConfiguration().get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
+        "Please provide an edge value class, if you don't need one, use NullWritable!");
+
+    Preconditions.checkArgument(
+        this.getConfiguration().get(Constants.RUNTIME_PARTITION_RECORDCONVERTER)
+            != null,
+        "Please provide a converter class for your vertex by using GraphJob#setVertexInputReaderClass!");
+
+    if (this.getConfiguration().get(VERTEX_OUTPUT_WRITER_CLASS_ATTR) == null) {
+      this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
+    }
+
+    this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        IncomingVertexMessageManager.class, MessageQueue.class);
+
+    super.submit();
+  }
+}