blob: fb04b019af97efc351e022067db93ea25eaa5369 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.pregelix.api.job;
import java.io.IOException;
import java.lang.reflect.Modifier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.util.GlobalEdgeCountAggregator;
import edu.uci.ics.pregelix.api.util.GlobalVertexCountAggregator;
import edu.uci.ics.pregelix.api.util.HadoopCountersAggregator;
import edu.uci.ics.pregelix.api.util.HadoopCountersGlobalAggregateHook;
/**
* This class represents a Pregelix job.
*/
public class PregelixJob extends Job {
/** Vertex class - required */
public static final String VERTEX_CLASS = "pregelix.vertexClass";
/** VertexInputFormat class - required */
public static final String VERTEX_INPUT_FORMAT_CLASS = "pregelix.vertexInputFormatClass";
/** VertexOutputFormat class - optional */
public static final String VERTEX_OUTPUT_FORMAT_CLASS = "pregelix.vertexOutputFormatClass";
/** Vertex combiner class - optional */
public static final String Message_COMBINER_CLASS = "pregelix.combinerClass";
/** Global aggregator class - optional */
public static final String GLOBAL_AGGREGATOR_CLASS = "pregelix.aggregatorClass";
/** Vertex resolver class - optional */
public static final String VERTEX_RESOLVER_CLASS = "pregelix.vertexResolverClass";
/** Vertex index class */
public static final String VERTEX_INDEX_CLASS = "pregelix.vertexIndexClass";
/** Vertex value class */
public static final String VERTEX_VALUE_CLASS = "pregelix.vertexValueClass";
/** Edge value class */
public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
/** Message value class */
public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
/** Partial combiner value class */
public static final String PARTIAL_COMBINE_VALUE_CLASS = "pregelix.partialCombinedValueClass";
/** Partial aggregate value class */
public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
/** Final aggregate value class */
public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
/** The normalized key computer class */
public static final String NMK_COMPUTER_CLASS = "pregelix.nmkComputerClass";
/** The partitioner class */
public static final String PARTITIONER_CLASS = "pregelix.partitionerClass";
/** num of vertices */
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
public static final String NUM_EDGES = "pregelix.numEdges";
/** increase state length */
public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
/** job id */
public static final String JOB_ID = "pregelix.jobid";
/** frame size */
public static final String FRAME_SIZE = "pregelix.framesize";
/** update intensive */
public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
/** the check point hook */
public static final String CKP_CLASS = "pregelix.checkpointHook";
/** the check point hook */
public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
/** the checkpoint interval */
public static final String CKP_INTERVAL = "pregelix.ckpinterval";
/** the dynamic optimization */
public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
/** the iteration complete reporter hook */
public static final String ITERATION_COMPLETE_CLASS = "pregelix.iterationCompleteReporter";
/** comma */
public static final String COMMA_STR = ",";
/** period */
public static final String PERIOD_STR = ".";
/** the names of the aggregator classes active for all vertex types */
public static final String[] DEFAULT_GLOBAL_AGGREGATOR_CLASSES = { GlobalVertexCountAggregator.class.getName(),
GlobalEdgeCountAggregator.class.getName() };
/** The name of an optional class that aggregates all Vertexes into mapreduce.Counters */
public static final String COUNTERS_AGGREGATOR_CLASS = "pregelix.aggregatedCountersClass";
/** the group-by algorithm */
public static final String GROUPING_ALGORITHM = "pregelix.groupalg";
/** the memory assigned to group-by */
public static final String GROUPING_MEM = "pregelix.groupmem";
/** the memory assigned for the sort operator */
public static final String SORT_MEM = "pregelix.sortmem";
/** the number of workers */
public static final String NUM_WORKERS = "pregelix.numworkers";
/** the application allows to skip combiner key during aggregations */
public static final String SKIP_COMBINER_KEY = "pregelix.skipCombinerKey";
/** the merge connector */
public static final String MERGE_CONNECTOR = "pregelix.merge";
/** the maximum allowed iteration */
public static final String MAX_ITERATION="pregelix.maxiteration";
/**
* Construct a Pregelix job from an existing configuration
*
* @param conf
* @throws IOException
*/
public PregelixJob(Configuration conf) throws IOException {
super(conf);
}
/**
* Constructor that will instantiate the configuration
*
* @param jobName
* User-defined job name
* @throws IOException
*/
public PregelixJob(String jobName) throws IOException {
super(new Configuration(), jobName);
}
/**
* Constructor.
*
* @param conf
* User-defined configuration
* @param jobName
* User-defined job name
* @throws IOException
*/
public PregelixJob(Configuration conf, String jobName) throws IOException {
super(conf, jobName);
}
/**
* Set the vertex class (required)
*
* @param vertexClass
* Runs vertex computation
*/
final public void setVertexClass(Class<?> vertexClass) {
getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
}
/**
* Set the vertex input format class (required)
*
* @param vertexInputFormatClass
* Determines how graph is input
*/
final public void setVertexInputFormatClass(Class<?> vertexInputFormatClass) {
getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, vertexInputFormatClass, VertexInputFormat.class);
}
/**
* Set the vertex output format class (optional)
*
* @param vertexOutputFormatClass
* Determines how graph is output
*/
final public void setVertexOutputFormatClass(Class<?> vertexOutputFormatClass) {
getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, vertexOutputFormatClass, VertexOutputFormat.class);
}
/**
* Set the vertex combiner class (optional)
*
* @param vertexCombinerClass
* Determines how vertex messages are combined
*/
final public void setMessageCombinerClass(Class<?> vertexCombinerClass) {
getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
}
/**
* Set the global aggregator class (optional)
*
* @param globalAggregatorClass
* Determines how messages are globally aggregated
*/
final public void addGlobalAggregatorClass(Class<?> globalAggregatorClass) {
String aggStr = globalAggregatorClass.getName();
String classes = getConfiguration().get(GLOBAL_AGGREGATOR_CLASS);
conf.set(GLOBAL_AGGREGATOR_CLASS, classes == null ? aggStr : classes + COMMA_STR + aggStr);
}
/**
* Set the job Id
*/
final public void setJobId(String jobId) {
getConfiguration().set(JOB_ID, jobId);
}
/**
* Set whether the vertex state length can be dynamically increased
*
* @param jobId
*/
final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
}
/**
* Set whether the vertex state length is fixed
*
* @param jobId
*/
final public void setFixedVertexValueSize(boolean fixedSize) {
getConfiguration().setBoolean(INCREASE_STATE_LENGTH, !fixedSize);
}
/**
* Set the frame size for a job
*
* @param frameSize
* the desired frame size
*/
final public void setFrameSize(int frameSize) {
getConfiguration().setInt(FRAME_SIZE, frameSize);
}
/**
* Set the normalized key computer class
*
* @param nkcClass
*/
final public void setNoramlizedKeyComputerClass(Class<?> nkcClass) {
getConfiguration().setClass(NMK_COMPUTER_CLASS, nkcClass, NormalizedKeyComputer.class);
}
/**
* Set the vertex partitioner class
*
* @param partitionerClass
*/
final public void setVertexPartitionerClass(Class<?> partitionerClass) {
getConfiguration().setClass(PARTITIONER_CLASS, partitionerClass, VertexPartitioner.class);
}
/**
* Indicate if the job needs to do a lot of graph mutations or variable size updates
*
* @param updateHeavyFlag
*/
final public void setLSMStorage(boolean variableSizedUpdateHeavyFlag) {
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
/**
* Users can provide an ICheckpointHook implementation to specify when to do checkpoint
*
* @param ckpClass
*/
final public void setCheckpointHook(Class<?> ckpClass) {
getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
}
/**
* Users can provide an ICheckpointHook implementation to specify when to do checkpoint
*
* @param ckpClass
*/
final public void setRecoveryCount(int recoveryCount) {
getConfiguration().setInt(RECOVERY_COUNT, recoveryCount);
}
/**
* Users can set the interval of checkpointing
*
* @param ckpInterval
*/
final public void setCheckpointingInterval(int ckpInterval) {
getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
}
/**
* Users can provide an IIterationCompleteReporterHook implementation to perform actions
* at the end of each iteration
*
* @param reporterClass
*/
final public void setIterationCompleteReporterHook(Class<? extends IIterationCompleteReporterHook> reporterClass) {
getConfiguration().setClass(ITERATION_COMPLETE_CLASS, reporterClass, IIterationCompleteReporterHook.class);
}
/**
* Indicate if dynamic optimization is enabled
*
* @param dynamicOpt
*/
final public void setEnableDynamicOptimization(boolean dynamicOpt) {
getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
}
/**
* Set the counter aggregator class
*
* @param aggClass
*/
final public void setCounterAggregatorClass(Class<? extends HadoopCountersAggregator<?, ?, ?, ?, ?>> aggClass) {
if (Modifier.isAbstract(aggClass.getModifiers())) {
throw new IllegalArgumentException("Aggregate class must be a concrete class, not an abstract one! (was "
+ aggClass.getName() + ")");
}
getConfiguration().setClass(COUNTERS_AGGREGATOR_CLASS, aggClass, HadoopCountersAggregator.class);
addGlobalAggregatorClass(aggClass);
setIterationCompleteReporterHook(HadoopCountersGlobalAggregateHook.class);
}
/**
* Set the group-by algorithm: sort-true or hash-false
*
* @param sortOrHash
*/
final public void setGroupByAlgorithm(boolean sortOrHash) {
getConfiguration().setBoolean(GROUPING_ALGORITHM, sortOrHash);
}
/**
* Set the memory buget for group-by operators (only hash-based)
*
* @param numberOfPages
*/
final public void setGroupByMemoryLimit(int numberOfPages) {
getConfiguration().setInt(GROUPING_MEM, numberOfPages);
}
/**
* Set the memory buget for sort operators (only hash-based)
*
* @param numberOfPages
*/
final public void setSortMemoryLimit(int numberOfPages) {
getConfiguration().setInt(SORT_MEM, numberOfPages);
}
/**
* Set the number of workers
*
* @param numWorkers
*/
final public void setNumWorkers(int numWorkers) {
getConfiguration().setInt(NUM_WORKERS, numWorkers);
}
/**
* Whether an application allows to skip the combiner key during message combination,
* this is a performance improvement tip.
* By default, the key is not skipped
*
* @param skip
* true to skip; otherwise, not.
*/
final public void setSkipCombinerKey(boolean skip) {
getConfiguration().setBoolean(SKIP_COMBINER_KEY, skip);
}
/**
* Whether to use merge connector
*
* @param merge
*/
final public void setMergeConnector(boolean merge){
getConfiguration().setBoolean(MERGE_CONNECTOR, merge);
}
/***
* Set the maximum allowed iteration
*
* @param iteration
*/
final public void setMaxIteration(int iteration){
getConfiguration().setInt(MAX_ITERATION, iteration);
}
@Override
public String toString() {
return getJobName();
}
}