blob: f025531269e28ad822d6b5fd619a88ac525dd12f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
import static org.junit.Assert.assertEquals;
public class TestRollingWindowManager {
Configuration conf;
RollingWindowManager manager;
String[] users;
final static int MIN_2_MS = 60000;
final int WINDOW_LEN_MS = 1 * MIN_2_MS;
final int BUCKET_CNT = 10;
final int N_TOP_USERS = 10;
final int BUCKET_LEN = WINDOW_LEN_MS / BUCKET_CNT;
@Before
public void init() {
conf = new Configuration();
conf.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, BUCKET_CNT);
conf.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
manager = new RollingWindowManager(conf, WINDOW_LEN_MS);
users = new String[2 * N_TOP_USERS];
for (int i = 0; i < users.length; i++) {
users[i] = "user" + i;
}
}
@Test
public void testTops() throws Exception {
long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
for (int i = 0; i < users.length; i++)
manager.recordMetric(time, "open", users[i], (i + 1) * 2);
time++;
for (int i = 0; i < users.length; i++)
manager.recordMetric(time, "close", users[i], i + 1);
time++;
TopWindow tops = manager.snapshot(time);
assertEquals("Unexpected number of ops", 3, tops.getOps().size());
assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
for (Op op : tops.getOps()) {
final List<User> topUsers = op.getTopUsers();
assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
if (op.getOpType().equals("open")) {
for (int i = 0; i < topUsers.size(); i++) {
User user = topUsers.get(i);
assertEquals("Unexpected count for user " + user.getUser(),
(users.length-i)*2, user.getCount());
}
// Closed form of sum(range(2,42,2))
assertEquals("Unexpected total count for op",
(2+(users.length*2))*(users.length/2),
op.getTotalCount());
}
}
// move the window forward not to see the "open" results
time += WINDOW_LEN_MS - 2;
tops = manager.snapshot(time);
assertEquals("Unexpected number of ops", 2, tops.getOps().size());
assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
final Op op = tops.getOps().get(1);
assertEquals("Should only see close ops", "close", op.getOpType());
final List<User> topUsers = op.getTopUsers();
for (int i = 0; i < topUsers.size(); i++) {
User user = topUsers.get(i);
assertEquals("Unexpected count for user " + user.getUser(),
(users.length-i), user.getCount());
}
// Closed form of sum(range(1,21))
assertEquals("Unexpected total count for op",
(1 + users.length) * (users.length / 2), op.getTotalCount());
}
@Test
public void windowReset() throws Exception {
Configuration config = new Configuration();
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
int period = 2;
RollingWindowManager rollingWindowManager =
new RollingWindowManager(config, period);
rollingWindowManager.recordMetric(0, "op1", users[0], 3);
checkValues(rollingWindowManager, 0, "op1", 3, 3);
checkValues(rollingWindowManager, period - 1, "op1", 3, 3);
checkValues(rollingWindowManager, period, "op1", 0, 0);
}
@Test
public void testTotal() throws Exception {
Configuration config = new Configuration();
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
int period = 10;
RollingWindowManager rollingWindowManager =
new RollingWindowManager(config, period);
long t = 0;
rollingWindowManager.recordMetric(t, "op1", users[0], 3);
checkValues(rollingWindowManager, t, "op1", 3, 3);
// both should have a value.
t = (long)(period * .5);
rollingWindowManager.recordMetric(t, "op2", users[0], 4);
checkValues(rollingWindowManager, t, "op1", 3, 7);
checkValues(rollingWindowManager, t, "op2", 4, 7);
// neither should reset.
t = period - 1;
checkValues(rollingWindowManager, t, "op1", 3, 7);
checkValues(rollingWindowManager, t, "op2", 4, 7);
// op1 should reset in its next period, but not op2.
t = period;
rollingWindowManager.recordMetric(10, "op1", users[0], 10);
checkValues(rollingWindowManager, t, "op1", 10, 14);
checkValues(rollingWindowManager, t, "op2", 4, 14);
// neither should reset.
t = (long)(period * 1.25);
rollingWindowManager.recordMetric(t, "op2", users[0], 7);
checkValues(rollingWindowManager, t, "op1", 10, 21);
checkValues(rollingWindowManager, t, "op2", 11, 21);
// op2 should reset.
t = (long)(period * 1.5);
rollingWindowManager.recordMetric(t, "op2", users[0], 13);
checkValues(rollingWindowManager, t, "op1", 10, 23);
checkValues(rollingWindowManager, t, "op2", 13, 23);
}
@Test
public void testWithFuzzing() throws Exception {
Configuration config = new Configuration();
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
int period = users.length/2;
RollingWindowManager rollingWindowManager =
new RollingWindowManager(config, period);
String[] ops = {"op1", "op2", "op3", "op4"};
Random rand = new Random();
for (int i=0; i < 10000; i++) {
rollingWindowManager.recordMetric(i, ops[rand.nextInt(ops.length)],
users[rand.nextInt(users.length)],
rand.nextInt(100));
TopWindow window = rollingWindowManager.snapshot(i);
checkTotal(window);
}
}
@Test
public void testOpTotal() throws Exception {
int numTopUsers = 2;
Configuration config = new Configuration();
config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, numTopUsers);
int period = users.length/2;
RollingWindowManager rollingWindowManager =
new RollingWindowManager(config, period);
int numOps = 3;
rollingWindowManager.recordMetric(0, "op1", "user1", 10);
rollingWindowManager.recordMetric(0, "op1", "user2", 20);
rollingWindowManager.recordMetric(0, "op1", "user3", 30);
rollingWindowManager.recordMetric(0, "op2", "user1", 1);
rollingWindowManager.recordMetric(0, "op2", "user4", 40);
rollingWindowManager.recordMetric(0, "op2", "user5", 50);
rollingWindowManager.recordMetric(0, "op3", "user6", 1);
rollingWindowManager.recordMetric(0, "op3", "user7", 11);
rollingWindowManager.recordMetric(0, "op3", "user8", 1);
TopWindow window = rollingWindowManager.snapshot(0);
Assert.assertEquals(numOps + 1, window.getOps().size());
Op allOp = window.getOps().get(0);
Assert.assertEquals(TopConf.ALL_CMDS, allOp.getOpType());
List<User> topUsers = allOp.getTopUsers();
Assert.assertEquals(numTopUsers * numOps, topUsers.size());
// ensure all the top users for each op are present in the total op.
for (int i = 1; i < numOps; i++) {
Assert.assertTrue(
topUsers.containsAll(window.getOps().get(i).getTopUsers()));
}
}
private void checkValues(RollingWindowManager rwManager, long time,
String opType, long value, long expectedTotal) throws Exception {
TopWindow window = rwManager.snapshot(time);
for (Op windowOp : window.getOps()) {
if (opType.equals(windowOp.getOpType())) {
assertEquals(value, windowOp.getTotalCount());
break;
}
}
assertEquals(expectedTotal, checkTotal(window));
}
private long checkTotal(TopWindow window) throws Exception {
long allOpTotal = 0;
long computedOpTotal = 0;
Map<String, User> userOpTally = new HashMap<>();
for (String user : users) {
userOpTally.put(user, new User(user, 0));
}
for (Op windowOp : window.getOps()) {
int multiplier;
if (TopConf.ALL_CMDS.equals(windowOp.getOpType())) {
multiplier = -1;
allOpTotal += windowOp.getTotalCount();
} else {
multiplier = 1;
computedOpTotal += windowOp.getTotalCount();
}
for (User user : windowOp.getAllUsers()) {
userOpTally.get(user.getUser()).add((int)(multiplier*user.getCount()));
}
}
assertEquals(allOpTotal, computedOpTotal);
for (String user : userOpTally.keySet()) {
assertEquals(0, userOpTally.get(user).getCount());
}
return computedOpTotal;
}
}