Fix removal of MasterLoggingAggregator
Test Plan:
mvn clean install
How should I test this?
Reviewers: pavanka, maja.kabiljo
Reviewed By: maja.kabiljo
Differential Revision: https://reviews.facebook.net/D27651
(cherry picked from commit 39237cf7fbf0742a8d7200db4039e5b43a58eca8)
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index 539bd7d..850e3ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -18,15 +18,19 @@
package org.apache.giraph.benchmark;
+import java.io.IOException;
+import java.util.Set;
+
import org.apache.commons.cli.CommandLine;
import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.MasterLoggingAggregator;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
@@ -35,9 +39,6 @@
import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.Set;
-
/**
* Benchmark for aggregators. Also checks the correctness.
*/
@@ -123,6 +124,7 @@
public void preSuperstep() {
addToWorkerAggregators(1);
checkAggregators();
+ MasterLoggingAggregator.aggregate("everything fine", this, getConf());
}
@Override
@@ -214,6 +216,7 @@
conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
conf.setInt(AGGREGATORS_NUM, AGGREGATORS.getOptionIntValue(cmd));
conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
+ MasterLoggingAggregator.setUseMasterLoggingAggregator(true, conf);
}
/**
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
index 36a4553..c13d7bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -26,6 +26,7 @@
import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.MasterLoggingAggregator;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
@@ -71,6 +72,7 @@
MasterGlobalCommUsage globalComm) {
this.conf = conf;
this.globalComm = globalComm;
+ MasterLoggingAggregator.registerAggregator(this, conf);
}
@Override
@@ -143,6 +145,11 @@
initAggregatorValues.clear();
}
+ /** Prepare before calling master compute */
+ public void prepareSuperstep() {
+ MasterLoggingAggregator.logAggregatedValue(this, conf);
+ }
+
@Override
public <A extends Writable> boolean registerAggregator(String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 62b089c..39b4a1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1668,6 +1668,8 @@
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
globalCommHandler.prepareSuperstep();
+ aggregatorTranslation.prepareSuperstep();
+
SuperstepClasses superstepClasses =
prepareMasterCompute(getSuperstep() + 1);
doMasterCompute();