Improve out-of-core metrics
Summary: For the metric showing the percentage of the graph in memory it makes more sense to show the lowest fraction of the graph that was in memory during a superstep. Basically, a user is more interested to see how bad was the out-of-core execution, and how many more machines he/she needs to use to run the job entirely in memory.
Test Plan:
mvn clean verify
visual, looking at Hadoop metric and per-worker metric
Reviewers: sergey.edunov, dionysis.logothetis, maja.kabiljo
Reviewed By: dionysis.logothetis, maja.kabiljo
Differential Revision: https://reviews.facebook.net/D59451
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
index 0cb8486..7e22d48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
@@ -62,6 +62,9 @@
/** aggregate bytes stored to local disks in out-of-core */
public static final String OOC_BYTES_STORED_NAME =
"Aggregate bytes stored to local disks (out-of-core)";
+ /** lowest percentage of graph in memory throughout the execution */
+ public static final String LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME =
+ "Lowest percentage of graph in memory so far (out-of-core)";
/** Singleton instance for everyone to use */
private static GiraphStats INSTANCE;
@@ -92,8 +95,10 @@
private static final int OOC_BYTES_LOADED = 11;
/** Aggregate OOC stored bytes counter */
private static final int OOC_BYTES_STORED = 12;
+ /** Lowest percentage of graph in memory over time */
+ private static final int LOWEST_GRAPH_PERCENTAGE_IN_MEMORY = 13;
/** Number of counters in this class */
- private static final int NUM_COUNTERS = 13;
+ private static final int NUM_COUNTERS = 14;
/** All the counters stored */
private final GiraphHadoopCounter[] counters;
@@ -123,6 +128,9 @@
getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME);
counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME);
counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME);
+ counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY] =
+ getCounter(LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME);
+ counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY].setValue(100);
}
/**
@@ -260,6 +268,10 @@
return counters[OOC_BYTES_STORED];
}
+ public GiraphHadoopCounter getLowestGraphPercentageInMemory() {
+ return counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY];
+ }
+
@Override
public Iterator<GiraphHadoopCounter> iterator() {
return Arrays.asList(counters).iterator();
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index dab3c2f..5636260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -46,6 +46,8 @@
private long oocStoreBytesCount = 0;
/** Bytes of data loaded to disk in the last superstep */
private long oocLoadBytesCount = 0;
+ /** Lowest percentage of graph in memory throughout the execution */
+ private int lowestGraphPercentageInMemory = 100;
/**
* Master's decision on whether we should checkpoint and
* what to do next.
@@ -108,6 +110,15 @@
this.checkpointStatus = checkpointStatus;
}
+ public int getLowestGraphPercentageInMemory() {
+ return lowestGraphPercentageInMemory;
+ }
+
+ public void setLowestGraphPercentageInMemory(
+ int lowestGraphPercentageInMemory) {
+ this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
+ }
+
/**
* Add bytes loaded to the global stats.
*
@@ -151,6 +162,9 @@
edgeCount = input.readLong();
messageCount = input.readLong();
messageBytesCount = input.readLong();
+ oocLoadBytesCount = input.readLong();
+ oocStoreBytesCount = input.readLong();
+ lowestGraphPercentageInMemory = input.readInt();
haltComputation = input.readBoolean();
if (input.readBoolean()) {
checkpointStatus = CheckpointStatus.values()[input.readInt()];
@@ -166,6 +180,9 @@
output.writeLong(edgeCount);
output.writeLong(messageCount);
output.writeLong(messageBytesCount);
+ output.writeLong(oocLoadBytesCount);
+ output.writeLong(oocStoreBytesCount);
+ output.writeInt(lowestGraphPercentageInMemory);
output.writeBoolean(haltComputation);
output.writeBoolean(checkpointStatus != null);
if (checkpointStatus != null) {
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index e931a99..e265163 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -64,6 +64,13 @@
private int workerWithMinFreeMemory;
/** Minimum fraction of free memory on a worker */
private double minFreeMemoryFraction = Double.MAX_VALUE;
+ /**
+ * Minimum percentage of graph in memory in any worker so far in the
+ * computation
+ */
+ private int minGraphPercentageInMemory = 100;
+ /** Id of the worker with min percentage of graph in memory */
+ private int workerWithMinGraphPercentageInMemory = -1;
/**
* Constructor
@@ -116,6 +123,11 @@
minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
workerProgress.getFreeMemoryFraction());
freeMemoryMB += workerProgress.getFreeMemoryMB();
+ int percentage = workerProgress.getLowestGraphPercentageInMemory();
+ if (percentage < minGraphPercentageInMemory) {
+ minGraphPercentageInMemory = percentage;
+ workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
+ }
}
if (!Iterables.isEmpty(workerProgresses)) {
freeMemoryMB /= Iterables.size(workerProgresses);
@@ -164,6 +176,12 @@
if (minFreeMemoryFraction < normalFreeMemoryFraction) {
sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
}
+ if (minGraphPercentageInMemory < 100) {
+ sb.append(" Spilling ")
+ .append(100 - minGraphPercentageInMemory)
+ .append("% of data to external storage on worker ")
+ .append(workerWithMinGraphPercentageInMemory);
+ }
return sb.toString();
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 8372bd3..605e818 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -946,6 +946,12 @@
workerMetrics.getBytesLoadedFromDisk());
globalStats.addOocStoreBytesCount(
workerMetrics.getBytesStoredOnDisk());
+ // Find the lowest percentage of graph in memory across all workers
+ // for one superstep
+ globalStats.setLowestGraphPercentageInMemory(
+ Math.min(globalStats.getLowestGraphPercentageInMemory(),
+ (int) Math.round(
+ workerMetrics.getGraphPercentageInMemory())));
aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
}
} catch (JSONException e) {
@@ -2058,5 +2064,10 @@
.increment(globalStats.getOocLoadBytesCount());
gs.getAggregateOOCBytesStored()
.increment(globalStats.getOocStoreBytesCount());
+ // Updating the lowest percentage of graph in memory throughout the
+ // execution across all the supersteps
+ int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
+ gs.getLowestGraphPercentageInMemory().setValue(
+ Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
index 219bcbd..e4281d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
@@ -68,7 +68,7 @@
superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
bytesLoadedFromDisk = 0;
bytesStoredOnDisk = 0;
- graphPercentageInMemory = 0;
+ graphPercentageInMemory = 100;
}
/**
@@ -93,8 +93,6 @@
registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
if (gauge != null) {
graphPercentageInMemory = gauge.value();
- } else {
- graphPercentageInMemory = 100;
}
return this;
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index d4c2de5..2037abe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -19,7 +19,7 @@
package org.apache.giraph.ooc;
import com.sun.management.GarbageCollectionNotificationInfo;
-import com.yammer.metrics.util.PercentGauge;
+import com.yammer.metrics.core.Gauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
@@ -485,15 +485,10 @@
@Override
public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
- superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new PercentGauge() {
+ superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
@Override
- protected double getNumerator() {
- return metaPartitionManager.getNumInMemoryPartitions();
- }
-
- @Override
- protected double getDenominator() {
- return metaPartitionManager.getNumPartitions();
+ public Double value() {
+ return metaPartitionManager.getLowestGraphFractionInMemory() * 100;
}
});
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 784d578..1332a3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -20,9 +20,11 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AtomicDouble;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.WorkerProgress;
import org.apache.log4j.Logger;
import java.util.ArrayList;
@@ -87,6 +89,16 @@
* processing
*/
private final Random randomGenerator;
+ /**
+ * What is the lowest fraction of partitions in memory, relative to the total
+ * number of available partitions? This is an indirect estimation of the
+ * amount of graph in memory, which can be used to estimate how many more
+ * machines needed to avoid out-of-core execution. At the beginning all the
+ * graph is in memory, so the fraction is 1. This fraction is calculated per
+ * superstep.
+ */
+ private final AtomicDouble lowestGraphFractionInMemory =
+ new AtomicDouble(1);
/**
* Constructor
@@ -125,6 +137,24 @@
return partitions.size();
}
+ public double getLowestGraphFractionInMemory() {
+ return lowestGraphFractionInMemory.get();
+ }
+
+ /**
+ * Update the lowest fraction of graph in memory so to have a more accurate
+ * information in one of the counters.
+ */
+ private synchronized void updateGraphFractionInMemory() {
+ double graphInMemory =
+ (double) getNumInMemoryPartitions() / getNumPartitions();
+ if (graphInMemory < lowestGraphFractionInMemory.get()) {
+ lowestGraphFractionInMemory.set(graphInMemory);
+ WorkerProgress.get().updateLowestGraphPercentageInMemory(
+ (int) (graphInMemory * 100));
+ }
+ }
+
/**
* Whether a given partition is available
*
@@ -592,6 +622,7 @@
*/
public void doneOffloadingPartition(int partitionId) {
numInMemoryPartitions.getAndDecrement();
+ updateGraphFractionInMemory();
MetaPartition meta = partitions.get(partitionId);
int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
synchronized (meta) {
@@ -618,6 +649,8 @@
dictionary.reset();
}
numPartitionsProcessed.set(0);
+ lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() /
+ getNumPartitions());
}
/**
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
index eb543cd..4065869 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -181,6 +181,17 @@
freeMemoryFraction = MemoryUtils.freeMemoryFraction();
}
+ /**
+ * Update lowest percentage of graph which stayed in memory so far in the
+ * execution
+ *
+ * @param fraction the fraction of graph in memory so far in this superstep
+ */
+ public synchronized void updateLowestGraphPercentageInMemory(int fraction) {
+ lowestGraphPercentageInMemory =
+ Math.min(lowestGraphPercentageInMemory, fraction);
+ }
+
@ThriftField(1)
public synchronized long getCurrentSuperstep() {
return currentSuperstep;
@@ -281,6 +292,11 @@
return freeMemoryFraction;
}
+ @ThriftField(21)
+ public synchronized int getLowestGraphPercentageInMemory() {
+ return lowestGraphPercentageInMemory;
+ }
+
public synchronized boolean isInputSuperstep() {
return currentSuperstep == -1;
}
@@ -392,4 +408,10 @@
public synchronized void setTaskId(int taskId) {
this.taskId = taskId;
}
+
+ @ThriftField
+ public synchronized void setLowestGraphPercentageInMemory(
+ int lowestGraphPercentageInMemory) {
+ this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
+ }
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
index 04ed2ea..583b073 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
@@ -72,6 +72,9 @@
/** Fraction of memory that's free */
protected double freeMemoryFraction;
+ /** Lowest percentage of graph in memory throughout the execution so far */
+ protected int lowestGraphPercentageInMemory = 100;
+
public boolean isInputSuperstep() {
return currentSuperstep == -1;
}