Add the GraphJob example, which is PageRank, to work on YARN cluster.
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1701975 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 637ad91..dede15f 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -42,6 +42,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-graph</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
diff --git a/yarn/src/main/java/org/apache/hama/bsp/PageRankonYarn.java b/yarn/src/main/java/org/apache/hama/bsp/PageRankonYarn.java
new file mode 100644
index 0000000..9db6e56
--- /dev/null
+++ b/yarn/src/main/java/org/apache/hama/bsp/PageRankonYarn.java
@@ -0,0 +1,217 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.*;
+import org.json.simple.JSONArray;
+import org.json.simple.parser.JSONParser;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class PageRankonYarn {
+ public static class PageRankVertex extends
+ Vertex<Text, NullWritable, DoubleWritable> {
+
+ static double DAMPING_FACTOR = 0.85;
+ static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
+
+ @Override
+ public void setup(HamaConfiguration conf) {
+ String val = conf.get("hama.pagerank.alpha");
+ if (val != null) {
+ DAMPING_FACTOR = Double.parseDouble(val);
+ }
+ val = conf.get("hama.graph.max.convergence.error");
+ if (val != null) {
+ MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
+ }
+
+ // initialize this vertex to 1 / count of global vertices in this graph
+ setValue(new DoubleWritable(1.0 / getTotalNumVertices()));
+ }
+
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ if (this.getSuperstepCount() >= 1) {
+ double sum = 0;
+ for (DoubleWritable msg : messages) {
+ sum += msg.get();
+ }
+ double alpha = (1.0d - DAMPING_FACTOR) / getTotalNumVertices();
+ setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+ aggregate(0, this.getValue());
+ }
+
+ // if we have not reached our global error yet, then proceed.
+ DoubleWritable globalError = getAggregatedValue(0);
+
+ if (globalError != null && this.getSuperstepCount() > 2
+ && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+ voteToHalt();
+ } else {
+ // in each superstep we are going to send a new rank to our neighbours
+ sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
+ / this.getEdges().size()));
+ }
+ }
+ }
+
+ public static class PagerankTextReader extends
+ VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+
+ String[] tokenArray = value.toString().split("\t");
+ String vtx = tokenArray[0].trim();
+ String[] edges = tokenArray[1].trim().split(" ");
+
+ vertex.setVertexID(new Text(vtx));
+
+ for (String v : edges) {
+ vertex.addEdge(new Edge<Text, NullWritable>(new Text(v), null));
+ }
+
+ return true;
+ }
+ }
+
+ public static class PagerankJsonReader extends
+ VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+ JSONArray jsonArray = (JSONArray) new JSONParser()
+ .parse(value.toString());
+
+ vertex.setVertexID(new Text(jsonArray.get(0).toString()));
+
+ Iterator<JSONArray> iter = ((JSONArray) jsonArray.get(2)).iterator();
+ while (iter.hasNext()) {
+ JSONArray edge = iter.next();
+ vertex.addEdge(new Edge<Text, NullWritable>(new Text(edge.get(0)
+ .toString()), null));
+ }
+
+ return true;
+ }
+ }
+
+ public static YARNGraphJob createJob(String[] args, HamaConfiguration conf,
+ Options opts) throws IOException, ParseException {
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (!cliParser.hasOption("i") || !cliParser.hasOption("o")) {
+ System.out
+ .println("No input or output path specified for PageRank, exiting.");
+ }
+
+ YARNGraphJob pageJob = new YARNGraphJob(conf, PageRankonYarn.class);
+ pageJob.setJobName("Pagerank");
+
+ pageJob.setVertexClass(PageRankVertex.class);
+ pageJob.setInputPath(new Path(cliParser.getOptionValue("i")));
+ pageJob.setOutputPath(new Path(cliParser.getOptionValue("o")));
+
+ // set the defaults
+ pageJob.setMaxIteration(30);
+ pageJob.set("hama.pagerank.alpha", "0.85");
+ // reference vertices to itself, because we don't have a dangling node
+ // contribution here
+ pageJob.set("hama.graph.self.ref", "true");
+ pageJob.set("hama.graph.max.convergence.error", "0.001");
+
+ if (cliParser.hasOption("t")) {
+ pageJob.setNumBspTask(Integer.parseInt(cliParser.getOptionValue("t")));
+ }
+
+ // error
+ pageJob.setAggregatorClass(AverageAggregator.class);
+
+ // Vertex reader
+ // According to file type, which is Text or Json,
+ // Vertex reader handle it differently.
+ if (cliParser.hasOption("f")) {
+ if (cliParser.getOptionValue("f").equals("text")) {
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ } else if (cliParser.getOptionValue("f").equals("json")) {
+ pageJob.setVertexInputReaderClass(PagerankJsonReader.class);
+ } else {
+ System.out.println("File type is not available to run Pagerank... "
+ + "File type set default value, Text.");
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ }
+ } else {
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ }
+
+ pageJob.setVertexIDClass(Text.class);
+ pageJob.setVertexValueClass(DoubleWritable.class);
+ pageJob.setEdgeValueClass(NullWritable.class);
+
+ pageJob.setInputFormat(TextInputFormat.class);
+ pageJob.setInputKeyClass(LongWritable.class);
+ pageJob.setInputValueClass(Text.class);
+
+ pageJob.setPartitioner(HashPartitioner.class);
+ pageJob.setOutputFormat(TextOutputFormat.class);
+ pageJob.setOutputKeyClass(Text.class);
+ pageJob.setOutputValueClass(DoubleWritable.class);
+ return pageJob;
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException, ParseException {
+ Options opts = new Options();
+ opts.addOption("i", "input_path", true, "The Location of output path.");
+ opts.addOption("o", "output_path", true, "The Location of input path.");
+ opts.addOption("h", "help", false, "Print usage");
+ opts.addOption("t", "task_num", true, "The number of tasks.");
+ opts.addOption("f", "file_type", true, "The file type of input data. Input"
+ + "file format which is \"text\" tab delimiter separated or \"json\"."
+ + "Default value - Text");
+
+ if (args.length < 2) {
+ new HelpFormatter().printHelp("pagerank -i INPUT_PATH -o OUTPUT_PATH "
+ + "[-t NUM_TASKS] [-f FILE_TYPE]", opts);
+ System.exit(-1);
+ }
+
+ HamaConfiguration conf = new HamaConfiguration();
+ YARNGraphJob pageJob = createJob(args, conf, opts);
+
+ long startTime = System.currentTimeMillis();
+ if (pageJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNGraphJob.java b/yarn/src/main/java/org/apache/hama/bsp/YARNGraphJob.java
new file mode 100644
index 0000000..483f912
--- /dev/null
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNGraphJob.java
@@ -0,0 +1,196 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.graph.*;
+
+import java.io.IOException;
+
+public class YARNGraphJob extends YARNBSPJob {
+ public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
+ public final static String VERTEX_ID_CLASS_ATTR = "hama.graph.vertex.id.class";
+ public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class";
+ public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
+
+ public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR = "hama.graph.vertex.output.writer.class";
+ public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
+
+ public YARNGraphJob(HamaConfiguration conf, Class<?> exampleClass)
+ throws IOException {
+ super(conf);
+ conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+ OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
+ conf.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
+ conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
+ conf.setBoolean("hama.use.unsafeserialization", true);
+
+ this.setBspClass(GraphJobRunner.class);
+ this.setJarByClass(exampleClass);
+ this.setVertexIDClass(Text.class);
+ this.setVertexValueClass(IntWritable.class);
+ this.setEdgeValueClass(IntWritable.class);
+ this.setPartitioner(HashPartitioner.class);
+ this.setMessageQueueBehaviour(MessageQueue.PERSISTENT_QUEUE);
+ }
+
+ /**
+ * Set the Vertex class for the job.
+ */
+
+ public void setVertexClass(Class<? extends
+ Vertex<? extends Writable, ? extends Writable, ? extends Writable>> cls)
+ throws IllegalStateException {
+ conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
+ setInputKeyClass(cls);
+ setInputValueClass(NullWritable.class);
+ }
+
+ /**
+ * Set the Vertex ID class for the job.
+ */
+ public void setVertexIDClass(Class<? extends Writable> cls)
+ throws IllegalStateException {
+ conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
+ }
+
+ /**
+ * Set the Vertex value class for the job.
+ */
+ public void setVertexValueClass(Class<? extends Writable> cls)
+ throws IllegalStateException {
+ conf.setClass(VERTEX_VALUE_CLASS_ATTR, cls, Writable.class);
+ }
+
+ /**
+ * Set the Edge value class for the job.
+ */
+ public void setEdgeValueClass(Class<? extends Writable> cls)
+ throws IllegalStateException {
+ conf.setClass(VERTEX_EDGE_VALUE_CLASS_ATTR, cls, Writable.class);
+ }
+
+ /**
+ * Set the aggregator for the job.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void setAggregatorClass(Class<? extends Aggregator> cls) {
+ this.setAggregatorClass(new Class[] { cls });
+ }
+
+ /**
+ * Sets multiple aggregators for the job.
+ */
+ @SuppressWarnings("rawtypes")
+ public void setAggregatorClass(Class<? extends Aggregator>... cls) {
+ String classNames = "";
+ for (Class<? extends Aggregator> cl : cls) {
+ classNames += cl.getName() + ";";
+ }
+ conf.set(AGGREGATOR_CLASS_ATTR, classNames);
+ }
+
+ /**
+ * Sets the input reader for parsing the input to vertices.
+ */
+ public void setVertexInputReaderClass(
+ @SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
+ ensureState(JobState.DEFINE);
+ conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
+ PartitioningRunner.RecordConverter.class);
+ }
+
+ /**
+ * Sets the output writer for materializing vertices to the output sink. If
+ * not set, the default DefaultVertexOutputWriter will be used.
+ */
+ public void setVertexOutputWriterClass(
+ @SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
+ ensureState(JobState.DEFINE);
+ conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
+ VertexOutputWriter.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
+ return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
+ .getClass(VERTEX_CLASS_ATTR, Vertex.class);
+ }
+
+ @Override
+ public void setPartitioner(
+ @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
+ super.setPartitioner(theClass);
+ }
+
+ @Override
+ public void setCombinerClass(
+ Class<? extends Combiner<? extends Writable>> cls) {
+ ensureState(JobState.DEFINE);
+ conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
+ }
+
+ /**
+ * Sets how many iterations the algorithm should perform, -1 for deactivated
+ * is default value.
+ */
+ public void setMaxIteration(int maxIteration) {
+ conf.setInt("hama.graph.max.iteration", maxIteration);
+ }
+
+ @Override
+ public void submit() throws IOException, InterruptedException {
+ Preconditions
+ .checkArgument(this.getConfiguration().get(VERTEX_CLASS_ATTR) != null,
+ "Please provide a vertex class!");
+ Preconditions.checkArgument(
+ this.getConfiguration().get(VERTEX_ID_CLASS_ATTR) != null,
+ "Please provide an vertex ID class!");
+ Preconditions.checkArgument(
+ this.getConfiguration().get(VERTEX_VALUE_CLASS_ATTR) != null,
+ "Please provide an vertex value class, if you don't need one, use NullWritable!");
+ Preconditions.checkArgument(
+ this.getConfiguration().get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
+ "Please provide an edge value class, if you don't need one, use NullWritable!");
+
+ Preconditions.checkArgument(
+ this.getConfiguration().get(Constants.RUNTIME_PARTITION_RECORDCONVERTER)
+ != null,
+ "Please provide a converter class for your vertex by using GraphJob#setVertexInputReaderClass!");
+
+ if (this.getConfiguration().get(VERTEX_OUTPUT_WRITER_CLASS_ATTR) == null) {
+ this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
+ }
+
+ this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ IncomingVertexMessageManager.class, MessageQueue.class);
+
+ super.submit();
+ }
+}