blob: 31741f4663c631c9f1af612b0266e4e1bf6937ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.graph;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.OutgoingMessageManager;
import org.apache.hama.bsp.message.queue.MessageQueue;
import com.google.common.base.Preconditions;
public class GraphJob extends BSPJob {
public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
public final static String VERTEX_ID_CLASS_ATTR = "hama.graph.vertex.id.class";
public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class";
public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR = "hama.graph.vertex.output.writer.class";
public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
/**
* Creates a new Graph Job with the given configuration and an exampleClass.
* The exampleClass is used to determine the user's jar to distribute in the
* cluster. This constructor sets the vertex id class to {@link Text}, the
* vertex value class to {@link IntWritable} and the edge value class to
* {@link IntWritable}.
*/
public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
throws IOException {
super(conf);
conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
conf.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
conf.setBoolean("hama.use.unsafeserialization", true);
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
this.setVertexIDClass(Text.class);
this.setVertexValueClass(IntWritable.class);
this.setEdgeValueClass(IntWritable.class);
this.setPartitioner(HashPartitioner.class);
this.setMessageQueueBehaviour(MessageQueue.PERSISTENT_QUEUE);
}
/**
* Set the Vertex class for the job.
*/
public void setVertexClass(
Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> cls)
throws IllegalStateException {
conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
setInputKeyClass(cls);
setInputValueClass(NullWritable.class);
}
/**
* Set the Vertex ID class for the job.
*/
public void setVertexIDClass(Class<? extends Writable> cls)
throws IllegalStateException {
conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
}
/**
* Set the Vertex value class for the job.
*/
public void setVertexValueClass(Class<? extends Writable> cls)
throws IllegalStateException {
conf.setClass(VERTEX_VALUE_CLASS_ATTR, cls, Writable.class);
}
/**
* Set the Edge value class for the job.
*/
public void setEdgeValueClass(Class<? extends Writable> cls)
throws IllegalStateException {
conf.setClass(VERTEX_EDGE_VALUE_CLASS_ATTR, cls, Writable.class);
}
/**
* Set the aggregator for the job.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void setAggregatorClass(Class<? extends Aggregator> cls) {
this.setAggregatorClass(new Class[] { cls });
}
/**
* Sets multiple aggregators for the job.
*/
@SuppressWarnings("rawtypes")
public void setAggregatorClass(Class<? extends Aggregator>... cls) {
String classNames = "";
for (Class<? extends Aggregator> cl : cls) {
classNames += cl.getName() + ";";
}
conf.set(AGGREGATOR_CLASS_ATTR, classNames);
}
/**
* Sets the input reader for parsing the input to vertices.
*/
public void setVertexInputReaderClass(
@SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
ensureState(JobState.DEFINE);
conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
RecordConverter.class);
}
/**
* Sets the output writer for materializing vertices to the output sink. If
* not set, the default DefaultVertexOutputWriter will be used.
*/
public void setVertexOutputWriterClass(
@SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
ensureState(JobState.DEFINE);
conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
VertexOutputWriter.class);
}
@SuppressWarnings("unchecked")
public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
.getClass(VERTEX_CLASS_ATTR, Vertex.class);
}
@Override
public void setPartitioner(
@SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
super.setPartitioner(theClass);
}
@Override
public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
ensureState(JobState.DEFINE);
conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
}
/**
* Sets how many iterations the algorithm should perform, -1 for deactivated
* is default value.
*/
public void setMaxIteration(int maxIteration) {
conf.setInt("hama.graph.max.iteration", maxIteration);
}
@Override
public void submit() throws IOException, InterruptedException {
Preconditions.checkArgument(
this.getConfiguration().get(VERTEX_CLASS_ATTR) != null,
"Please provide a vertex class!");
Preconditions.checkArgument(
this.getConfiguration().get(VERTEX_ID_CLASS_ATTR) != null,
"Please provide an vertex ID class!");
Preconditions
.checkArgument(
this.getConfiguration().get(VERTEX_VALUE_CLASS_ATTR) != null,
"Please provide an vertex value class, if you don't need one, use NullWritable!");
Preconditions
.checkArgument(this.getConfiguration()
.get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
"Please provide an edge value class, if you don't need one, use NullWritable!");
Preconditions
.checkArgument(
this.getConfiguration().get(
Constants.RUNTIME_PARTITION_RECORDCONVERTER) != null,
"Please provide a converter class for your vertex by using GraphJob#setVertexInputReaderClass!");
if (this.getConfiguration().get(VERTEX_OUTPUT_WRITER_CLASS_ATTR) == null) {
this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
}
this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
IncomingVertexMessageManager.class, MessageQueue.class);
super.submit();
}
}