blob: 28d23d2b56eef0feac086a36e15cd16c3bbc7f3f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.examples;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
/**
* NOTE: Graph modification APIs can be used only with {@link MapVerticesInfo}.
*
* This is an example of how to manipulate Graphs dynamically. The input of this
* example is a number in each row. We assume that the is a vertex with ID:1
* which is responsible to create a sum vertex that will aggregate the values of
* the other vertices. During the aggregation, sum vertex will delete all other
* vertices.
*
* Input example: 1 2 3 4
*
* Output example: sum 10
*/
public class DynamicGraph {
private static enum DYNAMIC_GRAPH_COUNTER {
ADDED_VERTEX
}
public static class GraphTextReader extends
VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
@Override
public boolean parseVertex(LongWritable key, Text value,
Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
vertex.setVertexID(value);
vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
return true;
}
}
public static class GraphVertex extends
Vertex<Text, NullWritable, IntWritable> {
private void createSumVertex() throws IOException {
if (this.getVertexID().toString().equals("1")) {
Text new_id = new Text("sum");
this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(),
new IntWritable(0));
this.getCounter(DYNAMIC_GRAPH_COUNTER.ADDED_VERTEX).increment(1);;
}
}
private void sendAllValuesToSumAndRemove() throws IOException {
if (!this.getVertexID().toString().equals("sum")) {
this.sendMessage(new Text("sum"), this.getValue());
this.remove();
}
}
// this must run only on "sum" vertex
private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
if (this.getVertexID().toString().equals("sum")) {
int s = 0;
for (IntWritable i : msgs) {
s += i.get();
}
s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES)
.getCounter();
this.setValue(new IntWritable(this.getValue().get() + s));
} else {
throw new UnsupportedOperationException(
"We have more vertecies than we expected: " + this.getVertexID()
+ " " + this.getValue());
}
}
@Override
public void compute(Iterable<IntWritable> msgs) throws IOException {
if (this.getSuperstepCount() == 0) {
createSumVertex();
} else if (this.getSuperstepCount() == 1) {
sendAllValuesToSumAndRemove();
} else if (this.getSuperstepCount() == 2) {
calculateSum(msgs);
} else if (this.getSuperstepCount() == 3) {
this.voteToHalt();
}
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
if (args.length != 2) {
printUsage();
}
HamaConfiguration conf = new HamaConfiguration(new Configuration());
GraphJob graphJob = createJob(args, conf);
long startTime = System.currentTimeMillis();
if (graphJob.waitForCompletion(true)) {
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
private static void printUsage() {
System.out.println("Usage: <input> <output>");
System.exit(-1);
}
private static GraphJob createJob(String[] args, HamaConfiguration conf)
throws IOException {
GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
graphJob.setJobName("Dynamic Graph");
graphJob.setVertexClass(GraphVertex.class);
graphJob.setInputPath(new Path(args[0]));
graphJob.setOutputPath(new Path(args[1]));
graphJob.setVertexIDClass(Text.class);
graphJob.setVertexValueClass(IntWritable.class);
graphJob.setEdgeValueClass(NullWritable.class);
graphJob.setInputFormat(TextInputFormat.class);
graphJob.setInputKeyClass(LongWritable.class);
graphJob.setInputValueClass(Text.class);
graphJob.setVertexInputReaderClass(GraphTextReader.class);
graphJob.setPartitioner(HashPartitioner.class);
graphJob.setOutputFormat(TextOutputFormat.class);
graphJob.setOutputKeyClass(Text.class);
graphJob.setOutputValueClass(IntWritable.class);
return graphJob;
}
}