HDFS-15707. NNTop counts don't add up as expected. (#2516) Contributed by Ahmed Hussein and Daryn Sharp
(cherry picked from commit 6a5864ee4ac387a390642f56aff8581fb5ed8ffd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
index 4d61d0f..124a10a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
@@ -23,7 +23,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
@@ -137,8 +136,6 @@
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
.values()) {
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
- rollingWindowManager.recordMetric(currTime,
- TopConf.ALL_CMDS, userName, 1);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
index 63ff125..65611f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
@@ -112,8 +112,8 @@
* as well as atomic fields.
*/
private class Bucket {
- AtomicLong value = new AtomicLong(0);
- AtomicLong updateTime = new AtomicLong(0);
+ private AtomicLong value = new AtomicLong(0);
+ private AtomicLong updateTime = new AtomicLong(-1); // -1 = never updated.
/**
* Check whether the last time that the bucket was updated is no longer
@@ -124,7 +124,7 @@
*/
boolean isStaleNow(long time) {
long utime = updateTime.get();
- return time - utime >= windowLenMs;
+ return (utime == -1) || (time - utime >= windowLenMs);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
index bdd0ab0..c3a68ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
@@ -17,20 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
-import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,11 +68,15 @@
public TopWindow(int windowMillis) {
this.windowMillis = windowMillis;
- this.top = Lists.newArrayList();
+ this.top = new LinkedList<>();
}
public void addOp(Op op) {
- top.add(op);
+ if (op.getOpType().equals(TopConf.ALL_CMDS)) {
+ top.add(0, op);
+ } else {
+ top.add(op);
+ }
}
public int getWindowLenMs() {
@@ -86,41 +92,59 @@
* Represents an operation within a TopWindow. It contains a ranked
* set of the top users for the operation.
*/
- public static class Op {
+ public static class Op implements Comparable<Op> {
private final String opType;
- private final List<User> topUsers;
+ private final List<User> users;
private final long totalCount;
+ private final int limit;
- public Op(String opType, long totalCount) {
+ public Op(String opType, UserCounts users, int limit) {
this.opType = opType;
- this.topUsers = Lists.newArrayList();
- this.totalCount = totalCount;
- }
-
- public void addUser(User u) {
- topUsers.add(u);
+ this.users = new ArrayList<>(users);
+ this.users.sort(Collections.reverseOrder());
+ this.totalCount = users.getTotal();
+ this.limit = limit;
}
public String getOpType() {
return opType;
}
+ public List<User> getAllUsers() {
+ return users;
+ }
+
public List<User> getTopUsers() {
- return topUsers;
+ return (users.size() > limit) ? users.subList(0, limit) : users;
}
public long getTotalCount() {
return totalCount;
}
+
+ @Override
+ public int compareTo(Op other) {
+ return Long.signum(totalCount - other.totalCount);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof Op) && totalCount == ((Op)o).totalCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return opType.hashCode();
+ }
}
/**
* Represents a user who called an Op within a TopWindow. Specifies the
* user and the number of times the user called the operation.
*/
- public static class User {
+ public static class User implements Comparable<User> {
private final String user;
- private final long count;
+ private long count;
public User(String user, long count) {
this.user = user;
@@ -134,6 +158,56 @@
public long getCount() {
return count;
}
+
+ public void add(long delta) {
+ count += delta;
+ }
+
+ @Override
+ public int compareTo(User other) {
+ return Long.signum(count - other.count);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof User) && user.equals(((User)o).user);
+ }
+
+ @Override
+ public int hashCode() {
+ return user.hashCode();
+ }
+ }
+
+ private static class UserCounts extends ArrayList<User> {
+ private long total = 0;
+
+ UserCounts(int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ public boolean add(User user) {
+ long count = user.getCount();
+ int i = indexOf(user);
+ if (i == -1) {
+ super.add(new User(user.getUser(), count));
+ } else {
+ get(i).add(count);
+ }
+ total += count;
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends User> users) {
+ users.forEach(user -> add(user));
+ return true;
+ }
+
+ public long getTotal() {
+ return total;
+ }
}
/**
@@ -142,7 +216,7 @@
* operated on that metric.
*/
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
- new ConcurrentHashMap<String, RollingWindowMap>();
+ new ConcurrentHashMap<>();
public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
@@ -184,35 +258,33 @@
*
* @param time the current time
* @return a TopWindow describing the top users for each metric in the
- * window.
+ * window.
*/
public TopWindow snapshot(long time) {
TopWindow window = new TopWindow(windowLenMs);
Set<String> metricNames = metricMap.keySet();
LOG.debug("iterating in reported metrics, size={} values={}",
metricNames.size(), metricNames);
+ UserCounts totalCounts = new UserCounts(metricMap.size());
for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
String metricName = entry.getKey();
RollingWindowMap rollingWindows = entry.getValue();
- TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
- final int size = topN.size();
- if (size == 0) {
- continue;
- }
- Op op = new Op(metricName, topN.getTotal());
- window.addOp(op);
- // Reverse the users from the TopUsers using a stack,
- // since we'd like them sorted in descending rather than ascending order
- Stack<NameValuePair> reverse = new Stack<NameValuePair>();
- for (int i = 0; i < size; i++) {
- reverse.push(topN.poll());
- }
- for (int i = 0; i < size; i++) {
- NameValuePair userEntry = reverse.pop();
- User user = new User(userEntry.getName(), userEntry.getValue());
- op.addUser(user);
+ UserCounts topN = getTopUsersForMetric(time, metricName, rollingWindows);
+ if (!topN.isEmpty()) {
+ window.addOp(new Op(metricName, topN, topUsersCnt));
+ totalCounts.addAll(topN);
}
}
+ // synthesize the overall total op count with the top users for every op.
+ Set<User> topUsers = new HashSet<>();
+ for (Op op : window.getOps()) {
+ topUsers.addAll(op.getTopUsers());
+ }
+ // intersect totals with the top users.
+ totalCounts.retainAll(topUsers);
+ // allowed to exceed the per-op topUsersCnt to capture total ops for
+ // any user
+ window.addOp(new Op(TopConf.ALL_CMDS, totalCounts, Integer.MAX_VALUE));
return window;
}
@@ -223,9 +295,9 @@
* @param metricName Name of metric
* @return
*/
- private TopN getTopUsersForMetric(long time, String metricName,
+ private UserCounts getTopUsersForMetric(long time, String metricName,
RollingWindowMap rollingWindows) {
- TopN topN = new TopN(topUsersCnt);
+ UserCounts topN = new UserCounts(topUsersCnt);
Iterator<Map.Entry<String, RollingWindow>> iterator =
rollingWindows.entrySet().iterator();
while (iterator.hasNext()) {
@@ -242,7 +314,7 @@
}
LOG.debug("offer window of metric: {} userName: {} sum: {}",
metricName, userName, windowSum);
- topN.offer(new NameValuePair(userName, windowSum));
+ topN.add(new User(userName, windowSum));
}
LOG.debug("topN users size for command {} is: {}",
metricName, topN.size());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
index 494ed08..f025531 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -56,7 +60,7 @@
}
@Test
- public void testTops() {
+ public void testTops() throws Exception {
long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
for (int i = 0; i < users.length; i++)
manager.recordMetric(time, "open", users[i], (i + 1) * 2);
@@ -66,11 +70,12 @@
time++;
TopWindow tops = manager.snapshot(time);
- assertEquals("Unexpected number of ops", 2, tops.getOps().size());
+ assertEquals("Unexpected number of ops", 3, tops.getOps().size());
+ assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
for (Op op : tops.getOps()) {
final List<User> topUsers = op.getTopUsers();
assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
- if (op.getOpType() == "open") {
+ if (op.getOpType().equals("open")) {
for (int i = 0; i < topUsers.size(); i++) {
User user = topUsers.get(i);
assertEquals("Unexpected count for user " + user.getUser(),
@@ -86,8 +91,9 @@
// move the window forward not to see the "open" results
time += WINDOW_LEN_MS - 2;
tops = manager.snapshot(time);
- assertEquals("Unexpected number of ops", 1, tops.getOps().size());
- final Op op = tops.getOps().get(0);
+ assertEquals("Unexpected number of ops", 2, tops.getOps().size());
+ assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
+ final Op op = tops.getOps().get(1);
assertEquals("Should only see close ops", "close", op.getOpType());
final List<User> topUsers = op.getTopUsers();
for (int i = 0; i < topUsers.size(); i++) {
@@ -99,4 +105,158 @@
assertEquals("Unexpected total count for op",
(1 + users.length) * (users.length / 2), op.getTotalCount());
}
+
+ @Test
+ public void windowReset() throws Exception {
+ Configuration config = new Configuration();
+ config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+ config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+ int period = 2;
+ RollingWindowManager rollingWindowManager =
+ new RollingWindowManager(config, period);
+ rollingWindowManager.recordMetric(0, "op1", users[0], 3);
+ checkValues(rollingWindowManager, 0, "op1", 3, 3);
+ checkValues(rollingWindowManager, period - 1, "op1", 3, 3);
+ checkValues(rollingWindowManager, period, "op1", 0, 0);
+ }
+
+ @Test
+ public void testTotal() throws Exception {
+ Configuration config = new Configuration();
+ config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+ config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+ int period = 10;
+ RollingWindowManager rollingWindowManager =
+ new RollingWindowManager(config, period);
+
+ long t = 0;
+ rollingWindowManager.recordMetric(t, "op1", users[0], 3);
+ checkValues(rollingWindowManager, t, "op1", 3, 3);
+
+ // both should have a value.
+ t = (long)(period * .5);
+ rollingWindowManager.recordMetric(t, "op2", users[0], 4);
+ checkValues(rollingWindowManager, t, "op1", 3, 7);
+ checkValues(rollingWindowManager, t, "op2", 4, 7);
+
+ // neither should reset.
+ t = period - 1;
+ checkValues(rollingWindowManager, t, "op1", 3, 7);
+ checkValues(rollingWindowManager, t, "op2", 4, 7);
+
+ // op1 should reset in its next period, but not op2.
+ t = period;
+ rollingWindowManager.recordMetric(10, "op1", users[0], 10);
+ checkValues(rollingWindowManager, t, "op1", 10, 14);
+ checkValues(rollingWindowManager, t, "op2", 4, 14);
+
+ // neither should reset.
+ t = (long)(period * 1.25);
+ rollingWindowManager.recordMetric(t, "op2", users[0], 7);
+ checkValues(rollingWindowManager, t, "op1", 10, 21);
+ checkValues(rollingWindowManager, t, "op2", 11, 21);
+
+ // op2 should reset.
+ t = (long)(period * 1.5);
+ rollingWindowManager.recordMetric(t, "op2", users[0], 13);
+ checkValues(rollingWindowManager, t, "op1", 10, 23);
+ checkValues(rollingWindowManager, t, "op2", 13, 23);
+ }
+
+ @Test
+ public void testWithFuzzing() throws Exception {
+ Configuration config = new Configuration();
+ config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+ config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+ int period = users.length/2;
+ RollingWindowManager rollingWindowManager =
+ new RollingWindowManager(config, period);
+
+ String[] ops = {"op1", "op2", "op3", "op4"};
+ Random rand = new Random();
+ for (int i=0; i < 10000; i++) {
+ rollingWindowManager.recordMetric(i, ops[rand.nextInt(ops.length)],
+ users[rand.nextInt(users.length)],
+ rand.nextInt(100));
+ TopWindow window = rollingWindowManager.snapshot(i);
+ checkTotal(window);
+ }
+ }
+
+ @Test
+ public void testOpTotal() throws Exception {
+ int numTopUsers = 2;
+ Configuration config = new Configuration();
+ config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+ config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, numTopUsers);
+ int period = users.length/2;
+ RollingWindowManager rollingWindowManager =
+ new RollingWindowManager(config, period);
+
+ int numOps = 3;
+ rollingWindowManager.recordMetric(0, "op1", "user1", 10);
+ rollingWindowManager.recordMetric(0, "op1", "user2", 20);
+ rollingWindowManager.recordMetric(0, "op1", "user3", 30);
+
+ rollingWindowManager.recordMetric(0, "op2", "user1", 1);
+ rollingWindowManager.recordMetric(0, "op2", "user4", 40);
+ rollingWindowManager.recordMetric(0, "op2", "user5", 50);
+
+ rollingWindowManager.recordMetric(0, "op3", "user6", 1);
+ rollingWindowManager.recordMetric(0, "op3", "user7", 11);
+ rollingWindowManager.recordMetric(0, "op3", "user8", 1);
+
+ TopWindow window = rollingWindowManager.snapshot(0);
+ Assert.assertEquals(numOps + 1, window.getOps().size());
+
+ Op allOp = window.getOps().get(0);
+ Assert.assertEquals(TopConf.ALL_CMDS, allOp.getOpType());
+ List<User> topUsers = allOp.getTopUsers();
+ Assert.assertEquals(numTopUsers * numOps, topUsers.size());
+ // ensure all the top users for each op are present in the total op.
+ for (int i = 1; i < numOps; i++) {
+ Assert.assertTrue(
+ topUsers.containsAll(window.getOps().get(i).getTopUsers()));
+ }
+ }
+
+ private void checkValues(RollingWindowManager rwManager, long time,
+ String opType, long value, long expectedTotal) throws Exception {
+ TopWindow window = rwManager.snapshot(time);
+ for (Op windowOp : window.getOps()) {
+ if (opType.equals(windowOp.getOpType())) {
+ assertEquals(value, windowOp.getTotalCount());
+ break;
+ }
+ }
+ assertEquals(expectedTotal, checkTotal(window));
+ }
+
+ private long checkTotal(TopWindow window) throws Exception {
+ long allOpTotal = 0;
+ long computedOpTotal = 0;
+
+ Map<String, User> userOpTally = new HashMap<>();
+ for (String user : users) {
+ userOpTally.put(user, new User(user, 0));
+ }
+ for (Op windowOp : window.getOps()) {
+ int multiplier;
+ if (TopConf.ALL_CMDS.equals(windowOp.getOpType())) {
+ multiplier = -1;
+ allOpTotal += windowOp.getTotalCount();
+ } else {
+ multiplier = 1;
+ computedOpTotal += windowOp.getTotalCount();
+ }
+ for (User user : windowOp.getAllUsers()) {
+ userOpTally.get(user.getUser()).add((int)(multiplier*user.getCount()));
+ }
+ }
+ assertEquals(allOpTotal, computedOpTotal);
+ for (String user : userOpTally.keySet()) {
+ assertEquals(0, userOpTally.get(user).getCount());
+ }
+ return computedOpTotal;
+ }
}