blob: 2bd2d964b3b74764036782b13d9eaf7cccff333e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.graph;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.log4j.Logger;
/**
* Master thread that will coordinate the activities of the tasks. It runs
* on all task processes, however, will only execute its algorithm if it knows
* it is the "leader" from ZooKeeper.
*/
@SuppressWarnings("rawtypes")
public class MasterThread<I extends WritableComparable,
V extends Writable,
E extends Writable,
M extends Writable> extends Thread {
/** Class logger */
private static final Logger LOG = Logger.getLogger(MasterThread.class);
/** Reference to shared BspService */
private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
/** Context (for counters) */
private final Context context;
/** Use superstep counters? */
private final boolean superstepCounterOn;
/** Setup seconds */
private double setupSecs = 0d;
/** Superstep timer (in seconds) map */
private final Map<Long, Double> superstepSecsMap =
new TreeMap<Long, Double>();
/** Counter group name for the Giraph timers */
public String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
/**
* Constructor.
*
* @param bspServiceMaster Master that already exists and setup() has
* been called.
*/
MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
Context context) {
super(MasterThread.class.getName());
this.bspServiceMaster = bspServiceMaster;
this.context = context;
superstepCounterOn = context.getConfiguration().getBoolean(
GiraphJob.USE_SUPERSTEP_COUNTERS,
GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
}
/**
* The master algorithm. The algorithm should be able to withstand
* failures and resume as necessary since the master may switch during a
* job.
*/
@Override
public void run() {
// Algorithm:
// 1. Become the master
// 2. If desired, restart from a manual checkpoint
// 3. Run all supersteps until complete
try {
long startMillis = System.currentTimeMillis();
long endMillis = 0;
bspServiceMaster.setup();
if (bspServiceMaster.becomeMaster() == true) {
// Attempt to create InputSplits if necessary. Bail out if that fails.
if (bspServiceMaster.getRestartedSuperstep() != BspService.UNSET_SUPERSTEP
|| bspServiceMaster.createInputSplits() != -1) {
long setupMillis = (System.currentTimeMillis() - startMillis);
context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
"Setup (milliseconds)").
increment(setupMillis);
setupSecs = setupMillis / 1000.0d;
SuperstepState superstepState = SuperstepState.INITIAL;
long cachedSuperstep = BspService.UNSET_SUPERSTEP;
while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
long startSuperstepMillis = System.currentTimeMillis();
cachedSuperstep = bspServiceMaster.getSuperstep();
superstepState = bspServiceMaster.coordinateSuperstep();
long superstepMillis = System.currentTimeMillis() -
startSuperstepMillis;
superstepSecsMap.put(new Long(cachedSuperstep),
superstepMillis / 1000.0d);
if (LOG.isInfoEnabled()) {
LOG.info("masterThread: Coordination of superstep " +
cachedSuperstep + " took " +
superstepMillis / 1000.0d +
" seconds ended with state " + superstepState +
" and is now on superstep " +
bspServiceMaster.getSuperstep());
}
if (superstepCounterOn) {
String counterPrefix;
if (cachedSuperstep == -1) {
counterPrefix = "Vertex input superstep";
} else {
counterPrefix = "Superstep " + cachedSuperstep;
}
context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
counterPrefix +
" (milliseconds)").
increment(superstepMillis);
}
// If a worker failed, restart from a known good superstep
if (superstepState == SuperstepState.WORKER_FAILURE) {
bspServiceMaster.restartFromCheckpoint(
bspServiceMaster.getLastGoodCheckpoint());
}
endMillis = System.currentTimeMillis();
}
bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
}
}
bspServiceMaster.cleanup();
if (!superstepSecsMap.isEmpty()) {
context.getCounter(
GIRAPH_TIMERS_COUNTER_GROUP_NAME,
"Shutdown (milliseconds)").
increment(System.currentTimeMillis() - endMillis);
if (LOG.isInfoEnabled()) {
LOG.info("setup: Took " + setupSecs + " seconds.");
}
for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
if (LOG.isInfoEnabled()) {
if (entry.getKey().longValue() ==
BspService.INPUT_SUPERSTEP) {
LOG.info("vertex input superstep: Took " +
entry.getValue() + " seconds.");
} else {
LOG.info("superstep " + entry.getKey() + ": Took " +
entry.getValue() + " seconds.");
}
}
}
if (LOG.isInfoEnabled()) {
LOG.info("shutdown: Took " +
(System.currentTimeMillis() - endMillis) /
1000.0d + " seconds.");
LOG.info("total: Took " +
((System.currentTimeMillis() / 1000.0d) -
setupSecs) + " seconds.");
}
context.getCounter(
GIRAPH_TIMERS_COUNTER_GROUP_NAME,
"Total (milliseconds)").
increment(System.currentTimeMillis() - startMillis);
}
} catch (Exception e) {
LOG.error("masterThread: Master algorithm failed: ", e);
throw new RuntimeException(e);
}
}
}