| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.graph; |
| |
| import org.apache.giraph.bsp.BspInputFormat; |
| import org.apache.giraph.bsp.BspOutputFormat; |
| import org.apache.giraph.graph.partition.GraphPartitionerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.ipc.Client; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.log4j.Logger; |
| |
| import java.io.IOException; |
| |
| /** |
| * Limits the functions that can be called by the user. Job is too flexible |
| * for our needs. For instance, our job should not have any reduce tasks. |
| */ |
| public class GiraphJob extends Job { |
| /** Vertex class - required */ |
| public static final String VERTEX_CLASS = "giraph.vertexClass"; |
| /** VertexInputFormat class - required */ |
| public static final String VERTEX_INPUT_FORMAT_CLASS = |
| "giraph.vertexInputFormatClass"; |
| |
| /** VertexOutputFormat class - optional */ |
| public static final String VERTEX_OUTPUT_FORMAT_CLASS = |
| "giraph.vertexOutputFormatClass"; |
| /** Vertex combiner class - optional */ |
| public static final String VERTEX_COMBINER_CLASS = |
| "giraph.combinerClass"; |
| /** Vertex resolver class - optional */ |
| public static final String VERTEX_RESOLVER_CLASS = |
| "giraph.vertexResolverClass"; |
| /** Graph partitioner factory class - optional */ |
| public static final String GRAPH_PARTITIONER_FACTORY_CLASS = |
| "giraph.graphPartitionerFactoryClass"; |
| |
| /** Vertex index class */ |
| public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass"; |
| /** Vertex value class */ |
| public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass"; |
| /** Edge value class */ |
| public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass"; |
| /** Message value class */ |
| public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass"; |
| /** Worker context class */ |
| public static final String WORKER_CONTEXT_CLASS = |
| "giraph.workerContextClass"; |
| /** AggregatorWriter class - optional */ |
| public static final String AGGREGATOR_WRITER_CLASS = |
| "giraph.aggregatorWriterClass"; |
| |
| /** |
| * Minimum number of simultaneous workers before this job can run (int) |
| */ |
| public static final String MIN_WORKERS = "giraph.minWorkers"; |
| /** |
| * Maximum number of simultaneous worker tasks started by this job (int). |
| */ |
| public static final String MAX_WORKERS = "giraph.maxWorkers"; |
| |
| /** |
| * Separate the workers and the master tasks. This is required |
| * to support dynamic recovery. (boolean) |
| */ |
| public static final String SPLIT_MASTER_WORKER = |
| "giraph.SplitMasterWorker"; |
| /** |
| * Default on whether to separate the workers and the master tasks. |
| * Needs to be "true" to support dynamic recovery. |
| */ |
| public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true; |
| |
| /** Indicates whether this job is run in an internal unit test */ |
| public static final String LOCAL_TEST_MODE = |
| "giraph.localTestMode"; |
| |
| /** not in local test mode per default */ |
| public static final boolean LOCAL_TEST_MODE_DEFAULT = false; |
| |
| /** |
| * Minimum percent of the maximum number of workers that have responded |
| * in order to continue progressing. (float) |
| */ |
| public static final String MIN_PERCENT_RESPONDED = |
| "giraph.minPercentResponded"; |
| /** Default 100% response rate for workers */ |
| public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f; |
| |
| /** Polling timeout to check on the number of responded tasks (int) */ |
| public static final String POLL_MSECS = "giraph.pollMsecs"; |
| /** Default poll msecs (30 seconds) */ |
| public static final int POLL_MSECS_DEFAULT = 30*1000; |
| |
| /** |
| * ZooKeeper comma-separated list (if not set, |
| * will start up ZooKeeper locally) |
| */ |
| public static final String ZOOKEEPER_LIST = "giraph.zkList"; |
| |
| /** ZooKeeper session millisecond timeout */ |
| public static final String ZOOKEEPER_SESSION_TIMEOUT = |
| "giraph.zkSessionMsecTimeout"; |
| /** Default Zookeeper session millisecond timeout */ |
| public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60*1000; |
| |
| /** Polling interval to check for the final ZooKeeper server data */ |
| public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS = |
| "giraph.zkServerlistPollMsecs"; |
| /** Default polling interval to check for the final ZooKeeper server data */ |
| public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = |
| 3*1000; |
| |
| /** Number of nodes (not tasks) to run Zookeeper on */ |
| public static final String ZOOKEEPER_SERVER_COUNT = |
| "giraph.zkServerCount"; |
| /** Default number of nodes to run Zookeeper on */ |
| public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1; |
| |
| /** ZooKeeper port to use */ |
| public static final String ZOOKEEPER_SERVER_PORT = |
| "giraph.zkServerPort"; |
| /** Default ZooKeeper port to use */ |
| public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181; |
| |
| /** Location of the ZooKeeper jar - Used internally, not meant for users */ |
| public static final String ZOOKEEPER_JAR = "giraph.zkJar"; |
| |
| /** Local ZooKeeper directory to use */ |
| public static final String ZOOKEEPER_DIR = "giraph.zkDir"; |
| |
| /** Initial port to start using for the RPC communication */ |
| public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort"; |
| /** Default port to start using for the RPC communication */ |
| public static final int RPC_INITIAL_PORT_DEFAULT = 30000; |
| |
| /** Maximum bind attempts for different RPC ports */ |
| public static final String MAX_RPC_PORT_BIND_ATTEMPTS = |
| "giraph.maxRpcPortBindAttempts"; |
| /** Default maximum bind attempts for different RPC ports */ |
| public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20; |
| |
| /** Maximum number of RPC handlers */ |
| public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers"; |
| /** Default maximum number of RPC handlers */ |
| public static final int RPC_NUM_HANDLERS_DEFAULT = 100; |
| |
| /** |
| * Maximum number of vertices per partition before sending. |
| * (input superstep only). |
| */ |
| public static final String MAX_VERTICES_PER_PARTITION = |
| "giraph.maxVerticesPerPartition"; |
| /** Default maximum number of vertices per partition before sending. */ |
| public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000; |
| |
| /** Maximum number of messages per peer before flush */ |
| public static final String MSG_SIZE = "giraph.msgSize"; |
| /** Default maximum number of messages per peer before flush */ |
| public static final int MSG_SIZE_DEFAULT = 1000; |
| |
| /** Maximum number of messages that can be bulk sent during a flush */ |
| public static final String MAX_MESSAGES_PER_FLUSH_PUT = |
| "giraph.maxMessagesPerFlushPut"; |
| /** Default number of messages that can be bulk sent during a flush */ |
| public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000; |
| |
| /** Number of flush threads per peer */ |
| public static final String MSG_NUM_FLUSH_THREADS = |
| "giraph.msgNumFlushThreads"; |
| |
| /** Number of poll attempts prior to failing the job (int) */ |
| public static final String POLL_ATTEMPTS = "giraph.pollAttempts"; |
| /** Default poll attempts */ |
| public static final int POLL_ATTEMPTS_DEFAULT = 10; |
| |
| /** Number of minimum vertices in each vertex range */ |
| public static final String MIN_VERTICES_PER_RANGE = |
| "giraph.minVerticesPerRange"; |
| /** Default number of minimum vertices in each vertex range */ |
| public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3; |
| |
| /** Minimum stragglers of the superstep before printing them out */ |
| public static final String PARTITION_LONG_TAIL_MIN_PRINT = |
| "giraph.partitionLongTailMinPrint"; |
| /** Only print stragglers with one as a default */ |
| public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1; |
| |
| /** Use superstep counters? (boolean) */ |
| public static final String USE_SUPERSTEP_COUNTERS = |
| "giraph.useSuperstepCounters"; |
| /** Default is to use the superstep counters */ |
| public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true; |
| |
| /** |
| * Set the multiplicative factor of how many partitions to create from |
| * a single InputSplit based on the number of total InputSplits. For |
| * example, if there are 10 total InputSplits and this is set to 0.5, then |
| * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the |
| * minimum size is met). |
| */ |
| public static final String TOTAL_INPUT_SPLIT_MULTIPLIER = |
| "giraph.totalInputSplitMultiplier"; |
| /** Default total input split multiplier */ |
| public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f; |
| |
| /** |
| * Input split sample percent - Used only for sampling and testing, rather |
| * than an actual job. The idea is that to test, you might only want a |
| * fraction of the actual input splits from your VertexInputFormat to |
| * load (values should be [0, 100]). |
| */ |
| public static final String INPUT_SPLIT_SAMPLE_PERCENT = |
| "giraph.inputSplitSamplePercent"; |
| /** Default is to use all the input splits */ |
| public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f; |
| |
| /** |
| * To limit outlier input splits from producing too many vertices or to |
| * help with testing, the number of vertices loaded from an input split can |
| * be limited. By default, everything is loaded. |
| */ |
| public static final String INPUT_SPLIT_MAX_VERTICES = |
| "giraph.InputSplitMaxVertices"; |
| /** |
| * Default is that all the vertices are to be loaded from the input |
| * split |
| */ |
| public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1; |
| |
| /** Java opts passed to ZooKeeper startup */ |
| public static final String ZOOKEEPER_JAVA_OPTS = |
| "giraph.zkJavaOpts"; |
| /** Default java opts passed to ZooKeeper startup */ |
| public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT = |
| "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " + |
| "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100"; |
| |
| /** |
| * How often to checkpoint (i.e. 0, means no checkpoint, |
| * 1 means every superstep, 2 is every two supersteps, etc.). |
| */ |
| public static final String CHECKPOINT_FREQUENCY = |
| "giraph.checkpointFrequency"; |
| |
| /** Default checkpointing frequency of every 2 supersteps. */ |
| public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2; |
| |
| /** |
| * Delete checkpoints after a successful job run? |
| */ |
| public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS = |
| "giraph.cleanupCheckpointsAfterSuccess"; |
| /** Default is to clean up the checkponts after a successful job */ |
| public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = |
| true; |
| |
| /** |
| * An application can be restarted manually by selecting a superstep. The |
| * corresponding checkpoint must exist for this to work. The user should |
| * set a long value. Default is start from scratch. |
| */ |
| public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep"; |
| |
| /** |
| * If ZOOKEEPER_LIST is not set, then use this directory to manage |
| * ZooKeeper |
| */ |
| public static final String ZOOKEEPER_MANAGER_DIRECTORY = |
| "giraph.zkManagerDirectory"; |
| /** |
| * Default ZooKeeper manager directory (where determining the servers in |
| * HDFS files will go). Final directory path will also have job number |
| * for uniqueness. |
| */ |
| public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT = |
| "_bsp/_defaultZkManagerDir"; |
| |
| /** This directory has/stores the available checkpoint files in HDFS. */ |
| public static final String CHECKPOINT_DIRECTORY = |
| "giraph.checkpointDirectory"; |
| /** |
| * Default checkpoint directory (where checkpoing files go in HDFS). Final |
| * directory path will also have the job number for uniqueness |
| */ |
| public static final String CHECKPOINT_DIRECTORY_DEFAULT = |
| "_bsp/_checkpoints/"; |
| |
| /** Keep the zookeeper output for debugging? Default is to remove it. */ |
| public static final String KEEP_ZOOKEEPER_DATA = |
| "giraph.keepZooKeeperData"; |
| /** Default is to remove ZooKeeper data. */ |
| public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false; |
| |
| /** Default ZooKeeper tick time. */ |
| public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000; |
| /** Default ZooKeeper init limit (in ticks). */ |
| public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; |
| /** Default ZooKeeper sync limit (in ticks). */ |
| public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5; |
| /** Default ZooKeeper snap count. */ |
| public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000; |
| /** Default ZooKeeper maximum client connections. */ |
| public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000; |
| /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */ |
| public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300*1000; |
| /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */ |
| public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600*1000; |
| |
| /** Class logger */ |
| private static final Logger LOG = Logger.getLogger(GiraphJob.class); |
| |
| /** |
| * Constructor that will instantiate the configuration |
| * |
| * @param jobName User-defined job name |
| * @throws IOException |
| */ |
| public GiraphJob(String jobName) throws IOException { |
| super(new Configuration(), jobName); |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param conf User-defined configuration |
| * @param jobName User-defined job name |
| * @throws IOException |
| */ |
| public GiraphJob(Configuration conf, String jobName) throws IOException { |
| super(conf, jobName); |
| } |
| |
| /** |
| * Make sure the configuration is set properly by the user prior to |
| * submitting the job. |
| */ |
| private void checkConfiguration() { |
| if (conf.getInt(MAX_WORKERS, -1) < 0) { |
| throw new RuntimeException("No valid " + MAX_WORKERS); |
| } |
| if (conf.getFloat(MIN_PERCENT_RESPONDED, |
| MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f || |
| conf.getFloat(MIN_PERCENT_RESPONDED, |
| MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) { |
| throw new IllegalArgumentException( |
| "Invalid " + |
| conf.getFloat(MIN_PERCENT_RESPONDED, |
| MIN_PERCENT_RESPONDED_DEFAULT) + " for " + |
| MIN_PERCENT_RESPONDED); |
| } |
| if (conf.getInt(MIN_WORKERS, -1) < 0) { |
| throw new IllegalArgumentException("No valid " + MIN_WORKERS); |
| } |
| if (BspUtils.getVertexClass(getConfiguration()) == null) { |
| throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS"); |
| } |
| if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) { |
| throw new IllegalArgumentException( |
| "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS"); |
| } |
| if (BspUtils.getVertexResolverClass(getConfiguration()) == null) { |
| setVertexResolverClass(VertexResolver.class); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("GiraphJob: No class found for " + |
| VERTEX_RESOLVER_CLASS + ", defaulting to " + |
| VertexResolver.class.getCanonicalName()); |
| } |
| } |
| } |
| |
| /** |
| * Set the vertex class (required) |
| * |
| * @param vertexClass Runs vertex computation |
| */ |
| final public void setVertexClass(Class<?> vertexClass) { |
| getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class); |
| } |
| |
| /** |
| * Set the vertex input format class (required) |
| * |
| * @param vertexInputFormatClass Determines how graph is input |
| */ |
| final public void setVertexInputFormatClass( |
| Class<?> vertexInputFormatClass) { |
| getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, |
| vertexInputFormatClass, |
| VertexInputFormat.class); |
| } |
| |
| /** |
| * Set the vertex output format class (optional) |
| * |
| * @param vertexOutputFormatClass Determines how graph is output |
| */ |
| final public void setVertexOutputFormatClass( |
| Class<?> vertexOutputFormatClass) { |
| getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, |
| vertexOutputFormatClass, |
| VertexOutputFormat.class); |
| } |
| |
| /** |
| * Set the vertex combiner class (optional) |
| * |
| * @param vertexCombinerClass Determines how vertex messages are combined |
| */ |
| final public void setVertexCombinerClass(Class<?> vertexCombinerClass) { |
| getConfiguration().setClass(VERTEX_COMBINER_CLASS, |
| vertexCombinerClass, |
| VertexCombiner.class); |
| } |
| |
| /** |
| * Set the graph partitioner class (optional) |
| * |
| * @param graphPartitionerClass Determines how the graph is partitioned |
| */ |
| final public void setGraphPartitionerFactoryClass( |
| Class<?> graphPartitionerFactoryClass) { |
| getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS, |
| graphPartitionerFactoryClass, |
| GraphPartitionerFactory.class); |
| } |
| |
| /** |
| * Set the vertex resolver class (optional) |
| * |
| * @param vertexResolverClass Determines how vertex mutations are resolved |
| */ |
| final public void setVertexResolverClass(Class<?> vertexResolverClass) { |
| getConfiguration().setClass(VERTEX_RESOLVER_CLASS, |
| vertexResolverClass, |
| VertexResolver.class); |
| } |
| |
| /** |
| * Set the worker context class (optional) |
| * |
| * @param workerContextClass Determines what code is executed on a each |
| * worker before and after each superstep and computation |
| */ |
| final public void setWorkerContextClass(Class<?> workerContextClass) { |
| getConfiguration().setClass(WORKER_CONTEXT_CLASS, |
| workerContextClass, |
| WorkerContext.class); |
| } |
| |
| /** |
| * Set the aggregator writer class (optional) |
| * |
| * @param aggregatorWriterClass Determines how the aggregators are |
| * written to file at the end of the job |
| */ |
| final public void setAggregatorWriterClass( |
| Class<?> aggregatorWriterClass) { |
| getConfiguration().setClass(AGGREGATOR_WRITER_CLASS, |
| aggregatorWriterClass, |
| AggregatorWriter.class); |
| } |
| |
| /** |
| * Set worker configuration for determining what is required for |
| * a superstep. |
| * |
| * @param minWorkers Minimum workers to do a superstep |
| * @param maxWorkers Maximum workers to do a superstep |
| * (max map tasks in job) |
| * @param minPercentResponded 0 - 100 % of the workers required to |
| * have responded before continuing the superstep |
| */ |
| final public void setWorkerConfiguration(int minWorkers, |
| int maxWorkers, |
| float minPercentResponded) { |
| conf.setInt(MIN_WORKERS, minWorkers); |
| conf.setInt(MAX_WORKERS, maxWorkers); |
| conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded); |
| } |
| |
| /** |
| * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper |
| * will be dynamically started by Giraph for this job. |
| * |
| * @param serverList Comma separated list of servers and ports |
| * (i.e. zk1:2221,zk2:2221) |
| */ |
| final public void setZooKeeperConfiguration(String serverList) { |
| conf.set(ZOOKEEPER_LIST, serverList); |
| } |
| |
| /** |
| * Check if the configuration is local. If it is local, do additional |
| * checks due to the restrictions of LocalJobRunner. |
| * |
| * @param conf Configuration |
| */ |
| private static void checkLocalJobRunnerConfiguration( |
| Configuration conf) { |
| String jobTracker = conf.get("mapred.job.tracker", null); |
| if (!jobTracker.equals("local")) { |
| // Nothing to check |
| return; |
| } |
| |
| int maxWorkers = conf.getInt(MAX_WORKERS, -1); |
| if (maxWorkers != 1) { |
| throw new IllegalArgumentException( |
| "checkLocalJobRunnerConfiguration: When using " + |
| "LocalJobRunner, must have only one worker since " + |
| "only 1 task at a time!"); |
| } |
| if (conf.getBoolean(SPLIT_MASTER_WORKER, |
| SPLIT_MASTER_WORKER_DEFAULT)) { |
| throw new IllegalArgumentException( |
| "checkLocalJobRunnerConfiguration: When using " + |
| "LocalJobRunner, you cannot run in split master / worker " + |
| "mode since there is only 1 task at a time!"); |
| } |
| } |
| |
| /** |
| * Check whether a specified int conf value is set and if not, set it. |
| * |
| * @param param Conf value to check |
| * @param defaultValue Assign to value if not set |
| */ |
| private void setIntConfIfDefault(String param, int defaultValue) { |
| if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) { |
| conf.setInt(param, defaultValue); |
| } |
| } |
| |
| /** |
| * Runs the actual graph application through Hadoop Map-Reduce. |
| * |
| * @param verbose If true, provide verbose output, false otherwise |
| * @throws ClassNotFoundException |
| * @throws InterruptedException |
| * @throws IOException |
| */ |
| final public boolean run(boolean verbose) |
| throws IOException, InterruptedException, ClassNotFoundException { |
| checkConfiguration(); |
| checkLocalJobRunnerConfiguration(conf); |
| setNumReduceTasks(0); |
| // Most users won't hit this hopefully and can set it higher if desired |
| setIntConfIfDefault("mapreduce.job.counters.limit", 512); |
| |
| // Capacity scheduler-specific settings. These should be enough for |
| // a reasonable Giraph job |
| setIntConfIfDefault("mapred.job.map.memory.mb", 1024); |
| setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024); |
| |
| // Speculative execution doesn't make sense for Giraph |
| conf.setBoolean("mapred.map.tasks.speculative.execution", false); |
| |
| // Set the ping interval to 5 minutes instead of one minute |
| // (DEFAULT_PING_INTERVAL) |
| Client.setPingInterval(conf, 60000*5); |
| |
| if (getJar() == null) { |
| setJarByClass(GiraphJob.class); |
| } |
| // Should work in MAPREDUCE-1938 to let the user jars/classes |
| // get loaded first |
| conf.setBoolean("mapreduce.user.classpath.first", true); |
| |
| setMapperClass(GraphMapper.class); |
| setInputFormatClass(BspInputFormat.class); |
| setOutputFormatClass(BspOutputFormat.class); |
| return waitForCompletion(verbose); |
| } |
| } |