HDFS-15707. NNTop counts don't add up as expected. (#2516) Contributed by Ahmed Hussein and Daryn Sharp

(cherry picked from commit 6a5864ee4ac387a390642f56aff8581fb5ed8ffd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
index 4d61d0f..124a10a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
@@ -23,7 +23,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
@@ -137,8 +136,6 @@
     for (RollingWindowManager rollingWindowManager : rollingWindowManagers
         .values()) {
       rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
-      rollingWindowManager.recordMetric(currTime,
-          TopConf.ALL_CMDS, userName, 1);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
index 63ff125..65611f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
@@ -112,8 +112,8 @@
    * as well as atomic fields.
    */
   private class Bucket {
-    AtomicLong value = new AtomicLong(0);
-    AtomicLong updateTime = new AtomicLong(0);
+    private AtomicLong value = new AtomicLong(0);
+    private AtomicLong updateTime = new AtomicLong(-1); // -1 = never updated.
 
     /**
      * Check whether the last time that the bucket was updated is no longer
@@ -124,7 +124,7 @@
      */
     boolean isStaleNow(long time) {
       long utime = updateTime.get();
-      return time - utime >= windowLenMs;
+      return (utime == -1) || (time - utime >= windowLenMs);
     }
 
     /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
index bdd0ab0..c3a68ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
@@ -17,20 +17,22 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.top.window;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Stack;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
-import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,11 +68,15 @@
 
     public TopWindow(int windowMillis) {
       this.windowMillis = windowMillis;
-      this.top = Lists.newArrayList();
+      this.top = new LinkedList<>();
     }
 
     public void addOp(Op op) {
-      top.add(op);
+      if (op.getOpType().equals(TopConf.ALL_CMDS)) {
+        top.add(0, op);
+      } else {
+        top.add(op);
+      }
     }
 
     public int getWindowLenMs() {
@@ -86,41 +92,59 @@
    * Represents an operation within a TopWindow. It contains a ranked 
    * set of the top users for the operation.
    */
-  public static class Op {
+  public static class Op implements Comparable<Op> {
     private final String opType;
-    private final List<User> topUsers;
+    private final List<User> users;
     private final long totalCount;
+    private final int limit;
 
-    public Op(String opType, long totalCount) {
+    public Op(String opType, UserCounts users, int limit) {
       this.opType = opType;
-      this.topUsers = Lists.newArrayList();
-      this.totalCount = totalCount;
-    }
-
-    public void addUser(User u) {
-      topUsers.add(u);
+      this.users = new ArrayList<>(users);
+      this.users.sort(Collections.reverseOrder());
+      this.totalCount = users.getTotal();
+      this.limit = limit;
     }
 
     public String getOpType() {
       return opType;
     }
 
+    public List<User> getAllUsers() {
+      return users;
+    }
+
     public List<User> getTopUsers() {
-      return topUsers;
+      return (users.size() > limit) ? users.subList(0, limit) : users;
     }
 
     public long getTotalCount() {
       return totalCount;
     }
+
+    @Override
+    public int compareTo(Op other) {
+      return Long.signum(totalCount - other.totalCount);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return (o instanceof Op) && totalCount == ((Op)o).totalCount;
+    }
+
+    @Override
+    public int hashCode() {
+      return opType.hashCode();
+    }
   }
 
   /**
    * Represents a user who called an Op within a TopWindow. Specifies the 
    * user and the number of times the user called the operation.
    */
-  public static class User {
+  public static class User implements Comparable<User> {
     private final String user;
-    private final long count;
+    private long count;
 
     public User(String user, long count) {
       this.user = user;
@@ -134,6 +158,56 @@
     public long getCount() {
       return count;
     }
+
+    public void add(long delta) {
+      count += delta;
+    }
+
+    @Override
+    public int compareTo(User other) {
+      return Long.signum(count - other.count);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return (o instanceof User) && user.equals(((User)o).user);
+    }
+
+    @Override
+    public int hashCode() {
+      return user.hashCode();
+    }
+  }
+
+  private static class UserCounts extends ArrayList<User> {
+    private long total = 0;
+
+    UserCounts(int capacity) {
+      super(capacity);
+    }
+
+    @Override
+    public boolean add(User user) {
+      long count = user.getCount();
+      int i = indexOf(user);
+      if (i == -1) {
+        super.add(new User(user.getUser(), count));
+      } else {
+        get(i).add(count);
+      }
+      total += count;
+      return true;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends User> users) {
+      users.forEach(user -> add(user));
+      return true;
+    }
+
+    public long getTotal() {
+      return total;
+    }
   }
 
   /**
@@ -142,7 +216,7 @@
    * operated on that metric.
    */
   public ConcurrentHashMap<String, RollingWindowMap> metricMap =
-      new ConcurrentHashMap<String, RollingWindowMap>();
+      new ConcurrentHashMap<>();
 
   public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
     
@@ -184,35 +258,33 @@
    *
    * @param time the current time
    * @return a TopWindow describing the top users for each metric in the 
-   * window.
+   *         window.
    */
   public TopWindow snapshot(long time) {
     TopWindow window = new TopWindow(windowLenMs);
     Set<String> metricNames = metricMap.keySet();
     LOG.debug("iterating in reported metrics, size={} values={}",
         metricNames.size(), metricNames);
+    UserCounts totalCounts = new UserCounts(metricMap.size());
     for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
       String metricName = entry.getKey();
       RollingWindowMap rollingWindows = entry.getValue();
-      TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
-      final int size = topN.size();
-      if (size == 0) {
-        continue;
-      }
-      Op op = new Op(metricName, topN.getTotal());
-      window.addOp(op);
-      // Reverse the users from the TopUsers using a stack, 
-      // since we'd like them sorted in descending rather than ascending order
-      Stack<NameValuePair> reverse = new Stack<NameValuePair>();
-      for (int i = 0; i < size; i++) {
-        reverse.push(topN.poll());
-      }
-      for (int i = 0; i < size; i++) {
-        NameValuePair userEntry = reverse.pop();
-        User user = new User(userEntry.getName(), userEntry.getValue());
-        op.addUser(user);
+      UserCounts topN = getTopUsersForMetric(time, metricName, rollingWindows);
+      if (!topN.isEmpty()) {
+        window.addOp(new Op(metricName, topN, topUsersCnt));
+        totalCounts.addAll(topN);
       }
     }
+    // synthesize the overall total op count with the top users for every op.
+    Set<User> topUsers = new HashSet<>();
+    for (Op op : window.getOps()) {
+      topUsers.addAll(op.getTopUsers());
+    }
+    // intersect totals with the top users.
+    totalCounts.retainAll(topUsers);
+    // allowed to exceed the per-op topUsersCnt to capture total ops for
+    // any user
+    window.addOp(new Op(TopConf.ALL_CMDS, totalCounts, Integer.MAX_VALUE));
     return window;
   }
 
@@ -223,9 +295,9 @@
    * @param metricName Name of metric
    * @return
    */
-  private TopN getTopUsersForMetric(long time, String metricName, 
+  private UserCounts getTopUsersForMetric(long time, String metricName,
       RollingWindowMap rollingWindows) {
-    TopN topN = new TopN(topUsersCnt);
+    UserCounts topN = new UserCounts(topUsersCnt);
     Iterator<Map.Entry<String, RollingWindow>> iterator =
         rollingWindows.entrySet().iterator();
     while (iterator.hasNext()) {
@@ -242,7 +314,7 @@
       }
       LOG.debug("offer window of metric: {} userName: {} sum: {}",
           metricName, userName, windowSum);
-      topN.offer(new NameValuePair(userName, windowSum));
+      topN.add(new User(userName, windowSum));
     }
     LOG.debug("topN users size for command {} is: {}",
         metricName, topN.size());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
index 494ed08..f025531 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.top.window;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -56,7 +60,7 @@
   }
 
   @Test
-  public void testTops() {
+  public void testTops() throws Exception {
     long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
     for (int i = 0; i < users.length; i++)
       manager.recordMetric(time, "open", users[i], (i + 1) * 2);
@@ -66,11 +70,12 @@
     time++;
     TopWindow tops = manager.snapshot(time);
 
-    assertEquals("Unexpected number of ops", 2, tops.getOps().size());
+    assertEquals("Unexpected number of ops", 3, tops.getOps().size());
+    assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
     for (Op op : tops.getOps()) {
       final List<User> topUsers = op.getTopUsers();
       assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
-      if (op.getOpType() == "open") {
+      if (op.getOpType().equals("open")) {
         for (int i = 0; i < topUsers.size(); i++) {
           User user = topUsers.get(i);
           assertEquals("Unexpected count for user " + user.getUser(),
@@ -86,8 +91,9 @@
     // move the window forward not to see the "open" results
     time += WINDOW_LEN_MS - 2;
     tops = manager.snapshot(time);
-    assertEquals("Unexpected number of ops", 1, tops.getOps().size());
-    final Op op = tops.getOps().get(0);
+    assertEquals("Unexpected number of ops", 2, tops.getOps().size());
+    assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
+    final Op op = tops.getOps().get(1);
     assertEquals("Should only see close ops", "close", op.getOpType());
     final List<User> topUsers = op.getTopUsers();
     for (int i = 0; i < topUsers.size(); i++) {
@@ -99,4 +105,158 @@
     assertEquals("Unexpected total count for op",
         (1 + users.length) * (users.length / 2), op.getTotalCount());
   }
+
+  @Test
+  public void windowReset() throws Exception {
+    Configuration config = new Configuration();
+    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+    int period = 2;
+    RollingWindowManager rollingWindowManager =
+        new RollingWindowManager(config, period);
+    rollingWindowManager.recordMetric(0, "op1", users[0], 3);
+    checkValues(rollingWindowManager, 0, "op1", 3, 3);
+    checkValues(rollingWindowManager, period - 1, "op1", 3, 3);
+    checkValues(rollingWindowManager, period, "op1", 0, 0);
+  }
+
+  @Test
+  public void testTotal() throws Exception {
+    Configuration config = new Configuration();
+    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+    int period = 10;
+    RollingWindowManager rollingWindowManager =
+        new RollingWindowManager(config, period);
+
+    long t = 0;
+    rollingWindowManager.recordMetric(t, "op1", users[0], 3);
+    checkValues(rollingWindowManager, t, "op1", 3, 3);
+
+    // both should have a value.
+    t = (long)(period * .5);
+    rollingWindowManager.recordMetric(t, "op2", users[0], 4);
+    checkValues(rollingWindowManager, t, "op1", 3, 7);
+    checkValues(rollingWindowManager, t, "op2", 4, 7);
+
+    // neither should reset.
+    t = period - 1;
+    checkValues(rollingWindowManager, t, "op1", 3, 7);
+    checkValues(rollingWindowManager, t, "op2", 4, 7);
+
+    // op1 should reset in its next period, but not op2.
+    t = period;
+    rollingWindowManager.recordMetric(10, "op1", users[0], 10);
+    checkValues(rollingWindowManager, t, "op1", 10, 14);
+    checkValues(rollingWindowManager, t, "op2", 4, 14);
+
+    // neither should reset.
+    t = (long)(period * 1.25);
+    rollingWindowManager.recordMetric(t, "op2", users[0], 7);
+    checkValues(rollingWindowManager, t, "op1", 10, 21);
+    checkValues(rollingWindowManager, t, "op2", 11, 21);
+
+    // op2 should reset.
+    t = (long)(period * 1.5);
+    rollingWindowManager.recordMetric(t, "op2", users[0], 13);
+    checkValues(rollingWindowManager, t, "op1", 10, 23);
+    checkValues(rollingWindowManager, t, "op2", 13, 23);
+  }
+
+  @Test
+  public void testWithFuzzing() throws Exception {
+    Configuration config = new Configuration();
+    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+    int period = users.length/2;
+    RollingWindowManager rollingWindowManager =
+        new RollingWindowManager(config, period);
+
+    String[] ops = {"op1", "op2", "op3", "op4"};
+    Random rand = new Random();
+    for (int i=0; i < 10000; i++) {
+      rollingWindowManager.recordMetric(i, ops[rand.nextInt(ops.length)],
+          users[rand.nextInt(users.length)],
+          rand.nextInt(100));
+      TopWindow window = rollingWindowManager.snapshot(i);
+      checkTotal(window);
+    }
+  }
+
+  @Test
+  public void testOpTotal() throws Exception {
+    int numTopUsers = 2;
+    Configuration config = new Configuration();
+    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
+    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, numTopUsers);
+    int period = users.length/2;
+    RollingWindowManager rollingWindowManager =
+        new RollingWindowManager(config, period);
+
+    int numOps = 3;
+    rollingWindowManager.recordMetric(0, "op1", "user1", 10);
+    rollingWindowManager.recordMetric(0, "op1", "user2", 20);
+    rollingWindowManager.recordMetric(0, "op1", "user3", 30);
+
+    rollingWindowManager.recordMetric(0, "op2", "user1", 1);
+    rollingWindowManager.recordMetric(0, "op2", "user4", 40);
+    rollingWindowManager.recordMetric(0, "op2", "user5", 50);
+
+    rollingWindowManager.recordMetric(0, "op3", "user6", 1);
+    rollingWindowManager.recordMetric(0, "op3", "user7", 11);
+    rollingWindowManager.recordMetric(0, "op3", "user8", 1);
+
+    TopWindow window = rollingWindowManager.snapshot(0);
+    Assert.assertEquals(numOps + 1, window.getOps().size());
+
+    Op allOp = window.getOps().get(0);
+    Assert.assertEquals(TopConf.ALL_CMDS, allOp.getOpType());
+    List<User> topUsers = allOp.getTopUsers();
+    Assert.assertEquals(numTopUsers * numOps, topUsers.size());
+    // ensure all the top users for each op are present in the total op.
+    for (int i = 1; i < numOps; i++) {
+      Assert.assertTrue(
+          topUsers.containsAll(window.getOps().get(i).getTopUsers()));
+    }
+  }
+
+  private void checkValues(RollingWindowManager rwManager, long time,
+      String opType, long value, long expectedTotal) throws Exception {
+    TopWindow window = rwManager.snapshot(time);
+    for (Op windowOp : window.getOps()) {
+      if (opType.equals(windowOp.getOpType())) {
+        assertEquals(value, windowOp.getTotalCount());
+        break;
+      }
+    }
+    assertEquals(expectedTotal, checkTotal(window));
+  }
+
+  private long checkTotal(TopWindow window) throws Exception {
+    long allOpTotal = 0;
+    long computedOpTotal = 0;
+
+    Map<String, User> userOpTally = new HashMap<>();
+    for (String user : users) {
+      userOpTally.put(user, new User(user, 0));
+    }
+    for (Op windowOp : window.getOps()) {
+      int multiplier;
+      if (TopConf.ALL_CMDS.equals(windowOp.getOpType())) {
+        multiplier = -1;
+        allOpTotal += windowOp.getTotalCount();
+      } else {
+        multiplier = 1;
+        computedOpTotal += windowOp.getTotalCount();
+      }
+      for (User user : windowOp.getAllUsers()) {
+        userOpTally.get(user.getUser()).add((int)(multiplier*user.getCount()));
+      }
+    }
+    assertEquals(allOpTotal, computedOpTotal);
+    for (String user : userOpTally.keySet()) {
+      assertEquals(0, userOpTally.get(user).getCount());
+    }
+    return computedOpTotal;
+  }
 }