blob: 71253cfb87770ddfae5e677fd8fe2404aeedcb43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.examples;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.lib.TextVertexInputFormat;
import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader;
import org.apache.giraph.lib.TextVertexOutputFormat;
import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
/**
* Demonstrates the basic Pregel shortest paths implementation.
*/
public class SimpleShortestPathsVertex extends
EdgeListVertex<LongWritable, DoubleWritable,
FloatWritable, DoubleWritable> implements Tool {
/** Configuration */
private Configuration conf;
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SimpleShortestPathsVertex.class);
/** The shortest paths id */
public static String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
/** Default shortest paths id */
public static long SOURCE_ID_DEFAULT = 1;
/**
* Is this vertex the source id?
*
* @return True if the source id
*/
private boolean isSource() {
return (getVertexId().get() ==
getContext().getConfiguration().getLong(SOURCE_ID,
SOURCE_ID_DEFAULT));
}
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
if (getSuperstep() == 0) {
setVertexValue(new DoubleWritable(Double.MAX_VALUE));
}
double minDist = isSource() ? 0d : Double.MAX_VALUE;
while (msgIterator.hasNext()) {
minDist = Math.min(minDist, msgIterator.next().get());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
" vertex value = " + getVertexValue());
}
if (minDist < getVertexValue().get()) {
setVertexValue(new DoubleWritable(minDist));
for (LongWritable targetVertexId : this) {
FloatWritable edgeValue = getEdgeValue(targetVertexId);
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex " + getVertexId() + " sent to " +
targetVertexId + " = " +
(minDist + edgeValue.get()));
}
sendMsg(targetVertexId,
new DoubleWritable(minDist + edgeValue.get()));
}
}
voteToHalt();
}
/**
* VertexInputFormat that supports {@link SimpleShortestPathsVertex}
*/
public static class SimpleShortestPathsVertexInputFormat extends
TextVertexInputFormat<LongWritable,
DoubleWritable,
FloatWritable,
DoubleWritable> {
@Override
public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
createVertexReader(InputSplit split,
TaskAttemptContext context)
throws IOException {
return new SimpleShortestPathsVertexReader(
textInputFormat.createRecordReader(split, context));
}
}
/**
* VertexReader that supports {@link SimpleShortestPathsVertex}. In this
* case, the edge values are not used. The files should be in the
* following JSON format:
* JSONArray(<vertex id>, <vertex value>,
* JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
* Here is an example with vertex id 1, vertex value 4.3, and two edges.
* First edge has a destination vertex 2, edge value 2.1.
* Second edge has a destination vertex 3, edge value 0.7.
* [1,4.3,[[2,2.1],[3,0.7]]]
*/
public static class SimpleShortestPathsVertexReader extends
TextVertexReader<LongWritable,
DoubleWritable, FloatWritable, DoubleWritable> {
public SimpleShortestPathsVertexReader(
RecordReader<LongWritable, Text> lineRecordReader) {
super(lineRecordReader);
}
@Override
public BasicVertex<LongWritable, DoubleWritable, FloatWritable,
DoubleWritable> getCurrentVertex()
throws IOException, InterruptedException {
BasicVertex<LongWritable, DoubleWritable, FloatWritable,
DoubleWritable> vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
DoubleWritable>createVertex(getContext().getConfiguration());
Text line = getRecordReader().getCurrentValue();
try {
JSONArray jsonVertex = new JSONArray(line.toString());
LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
DoubleWritable vertexValue = new DoubleWritable(jsonVertex.getDouble(1));
Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
for (int i = 0; i < jsonEdgeArray.length(); ++i) {
JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
edges.put(new LongWritable(jsonEdge.getLong(0)),
new FloatWritable((float) jsonEdge.getDouble(1)));
}
vertex.initialize(vertexId, vertexValue, edges, null);
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Couldn't get vertex from line " + line, e);
}
return vertex;
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
return getRecordReader().nextKeyValue();
}
}
/**
* VertexOutputFormat that supports {@link SimpleShortestPathsVertex}
*/
public static class SimpleShortestPathsVertexOutputFormat extends
TextVertexOutputFormat<LongWritable, DoubleWritable,
FloatWritable> {
@Override
public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
RecordWriter<Text, Text> recordWriter =
textOutputFormat.getRecordWriter(context);
return new SimpleShortestPathsVertexWriter(recordWriter);
}
}
/**
* VertexWriter that supports {@link SimpleShortestPathsVertex}
*/
public static class SimpleShortestPathsVertexWriter extends
TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
public SimpleShortestPathsVertexWriter(
RecordWriter<Text, Text> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
public void writeVertex(BasicVertex<LongWritable, DoubleWritable,
FloatWritable, ?> vertex)
throws IOException, InterruptedException {
JSONArray jsonVertex = new JSONArray();
try {
jsonVertex.put(vertex.getVertexId().get());
jsonVertex.put(vertex.getVertexValue().get());
JSONArray jsonEdgeArray = new JSONArray();
for (LongWritable targetVertexId : vertex) {
JSONArray jsonEdge = new JSONArray();
jsonEdge.put(targetVertexId.get());
jsonEdge.put(vertex.getEdgeValue(targetVertexId).get());
jsonEdgeArray.put(jsonEdge);
}
jsonVertex.put(jsonEdgeArray);
} catch (JSONException e) {
throw new IllegalArgumentException(
"writeVertex: Couldn't write vertex " + vertex);
}
getRecordWriter().write(new Text(jsonVertex.toString()), null);
}
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] argArray) throws Exception {
Preconditions.checkArgument(argArray.length == 4,
"run: Must have 4 arguments <input path> <output path> " +
"<source vertex id> <# of workers>");
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
job.setVertexClass(getClass());
job.setVertexInputFormatClass(
SimpleShortestPathsVertexInputFormat.class);
job.setVertexOutputFormatClass(
SimpleShortestPathsVertexOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(argArray[0]));
FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
Long.parseLong(argArray[2]));
job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
Integer.parseInt(argArray[3]),
100.0f);
return job.run(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SimpleShortestPathsVertex(), args));
}
}