blob: 882c4f409a0555436fcbd824438e91a7f027ad36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.job;
import com.google.common.collect.Iterables;
import org.apache.giraph.conf.FloatConfOption;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.worker.WorkerProgressStats;
import org.apache.hadoop.conf.Configuration;
import javax.annotation.concurrent.NotThreadSafe;
import java.text.DecimalFormat;
/**
* Class which combines multiple workers' progresses to get overall
* application progress
*/
@NotThreadSafe
public class CombinedWorkerProgress extends WorkerProgressStats {
/** Decimal format which rounds numbers to two decimal places */
public static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#.##");
/**
* If free memory fraction on some worker goes below this value,
* warning will be printed
*/
public static final FloatConfOption NORMAL_FREE_MEMORY_FRACTION =
new FloatConfOption("giraph.normalFreeMemoryFraction", 0.1f,
"If free memory fraction on some worker goes below this value, " +
"warning will be printed");
/**
* If free memory fraction on some worker goes below this value,
* warning will be printed
*/
private double normalFreeMemoryFraction;
/**
* How many workers have reported that they are in highest reported
* superstep
*/
private int workersInSuperstep = 0;
/**
* How many workers reported that they finished application
*/
private int workersDone = 0;
/** Minimum amount of free memory on a worker */
private double minFreeMemoryMB = Double.MAX_VALUE;
/** Name of the worker with min free memory */
private int workerWithMinFreeMemory;
/** Minimum fraction of free memory on a worker */
private double minFreeMemoryFraction = Double.MAX_VALUE;
/**
* Minimum percentage of graph in memory in any worker so far in the
* computation
*/
private int minGraphPercentageInMemory = 100;
/** Id of the worker with min percentage of graph in memory */
private int workerWithMinGraphPercentageInMemory = -1;
/** Master progress */
private MasterProgress masterProgress;
/**
* Constructor
*
* @param workerProgresses Worker progresses to combine
* @param masterProgress Master progress
* @param conf Configuration
*/
public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
MasterProgress masterProgress, Configuration conf) {
this.masterProgress = masterProgress;
normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
for (WorkerProgress workerProgress : workerProgresses) {
if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
verticesToCompute = 0;
verticesComputed = 0;
partitionsToCompute = 0;
partitionsComputed = 0;
currentSuperstep = workerProgress.getCurrentSuperstep();
workersInSuperstep = 0;
}
if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
workersInSuperstep++;
if (isInputSuperstep()) {
verticesLoaded += workerProgress.getVerticesLoaded();
vertexInputSplitsLoaded +=
workerProgress.getVertexInputSplitsLoaded();
edgesLoaded += workerProgress.getEdgesLoaded();
edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
} else if (isComputeSuperstep()) {
verticesToCompute += workerProgress.getVerticesToCompute();
verticesComputed += workerProgress.getVerticesComputed();
partitionsToCompute += workerProgress.getPartitionsToCompute();
partitionsComputed += workerProgress.getPartitionsComputed();
} else if (isOutputSuperstep()) {
verticesToStore += workerProgress.getVerticesToStore();
verticesStored += workerProgress.getVerticesStored();
partitionsToStore += workerProgress.getPartitionsToStore();
partitionsStored += workerProgress.getPartitionsStored();
}
}
if (workerProgress.isStoringDone()) {
workersDone++;
}
if (workerProgress.getFreeMemoryMB() < minFreeMemoryMB) {
minFreeMemoryMB = workerProgress.getFreeMemoryMB();
workerWithMinFreeMemory = workerProgress.getTaskId();
}
minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
workerProgress.getFreeMemoryFraction());
freeMemoryMB += workerProgress.getFreeMemoryMB();
int percentage = workerProgress.getLowestGraphPercentageInMemory();
if (percentage < minGraphPercentageInMemory) {
minGraphPercentageInMemory = percentage;
workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
}
}
if (!Iterables.isEmpty(workerProgresses)) {
freeMemoryMB /= Iterables.size(workerProgresses);
}
}
/**
* Is the application done
*
* @param expectedWorkersDone Number of workers which should be done in
* order for application to be done
* @return True if application is done
*/
public boolean isDone(int expectedWorkersDone) {
return workersDone == expectedWorkersDone;
}
/**
* Get string describing total job progress
*
* @return String describing total job progress
*/
protected String getProgressString() {
StringBuilder sb = new StringBuilder();
if (isInputSuperstep()) {
sb.append("Loading data: ");
if (!masterProgress.vertexInputSplitsSet() ||
masterProgress.getVertexInputSplitCount() > 0) {
sb.append(verticesLoaded).append(" vertices loaded, ");
sb.append(vertexInputSplitsLoaded).append(
" vertex input splits loaded");
if (masterProgress.getVertexInputSplitCount() > 0) {
sb.append(" (out of ").append(
masterProgress.getVertexInputSplitCount()).append(")");
}
sb.append("; ");
}
if (!masterProgress.edgeInputSplitsSet() ||
masterProgress.getEdgeInputSplitsCount() > 0) {
sb.append(edgesLoaded).append(" edges loaded, ");
sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
if (masterProgress.getEdgeInputSplitsCount() > 0) {
sb.append(" (out of ").append(
masterProgress.getEdgeInputSplitsCount()).append(")");
}
}
} else if (isComputeSuperstep()) {
sb.append("Compute superstep ").append(currentSuperstep).append(": ");
sb.append(verticesComputed).append(" out of ").append(
verticesToCompute).append(" vertices computed; ");
sb.append(partitionsComputed).append(" out of ").append(
partitionsToCompute).append(" partitions computed");
} else if (isOutputSuperstep()) {
sb.append("Storing data: ");
sb.append(verticesStored).append(" out of ").append(
verticesToStore).append(" vertices stored; ");
sb.append(partitionsStored).append(" out of ").append(
partitionsToStore).append(" partitions stored");
}
return sb.toString();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
sb.append(getProgressString());
sb.append("; min free memory on worker ").append(
workerWithMinFreeMemory).append(" - ").append(
DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
DECIMAL_FORMAT.format(freeMemoryMB)).append("MB");
if (minFreeMemoryFraction < normalFreeMemoryFraction) {
sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
}
if (minGraphPercentageInMemory < 100) {
sb.append(" Spilling ")
.append(100 - minGraphPercentageInMemory)
.append("% of data to external storage on worker ")
.append(workerWithMinGraphPercentageInMemory);
}
return sb.toString();
}
/**
* Check if this instance made progress from another instance
*
* @param lastProgress Instance to compare with
* @return True iff progress was made
*/
public boolean madeProgressFrom(CombinedWorkerProgress lastProgress) {
// If progress strings are different there was progress made
if (!getProgressString().equals(lastProgress.getProgressString())) {
return true;
}
// If more workers were done there was progress made
return workersDone != lastProgress.workersDone;
}
}