blob: 7cc14faa5a205da9fec250a4d89cd576390074ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.stats;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.scheduler.WorkerSlot;
import org.junit.Assert;
import org.junit.Test;
public class TestStatsUtil {
/*
* aggWorkerStats tests
*/
private Map<Integer, String> task2Component = new HashMap<Integer, String>();
private Map<List<Integer>, Map<String, Object>> beats = new HashMap<List<Integer>, Map<String, Object>>();
private Map<List<Long>, List<Object>> exec2NodePort = new HashMap<List<Long>, List<Object>>();
private Map<String, String> nodeHost = new HashMap<String, String>();
private Map<WorkerSlot, WorkerResources> worker2Resources = new HashMap<WorkerSlot, WorkerResources>();
private List<Long> makeExecutorId(int firstTask, int lastTask) {
return Arrays.asList(new Long(firstTask), new Long(lastTask));
}
public void makeTopoInfo() {
List<Object> hostPort = new ArrayList<Object>();
hostPort.add("node1");
hostPort.add(new Long(1));
exec2NodePort.put(makeExecutorId(1, 1), hostPort);
nodeHost.put("node1", "host1");
nodeHost.put("node2", "host2");
nodeHost.put("node3", "host3");
List<Integer> exec1 = new ArrayList<Integer>();
exec1.add(1);
exec1.add(1);
HashMap<String, Object> exec1Beat = new HashMap<String, Object>();
exec1Beat.put("uptime", 100);
// should not be returned since this executor is not part of the topology's assignment
List<Integer> exec2 = new ArrayList<Integer>();
exec2.add(2);
exec2.add(4);
HashMap<String, Object> exec2Beat = new HashMap<String, Object>();
exec2Beat.put("uptime", 200);
beats.put(exec1, exec1Beat);
beats.put(exec2, exec2Beat);
task2Component.put(1, "my-component");
task2Component.put(2, "__sys1");
task2Component.put(3, "__sys2");
task2Component.put(4, "__sys3");
task2Component.put(5, "__sys4");
task2Component.put(6, "__sys4");
task2Component.put(7, "my-component2");
WorkerResources ws1 = new WorkerResources();
ws1.set_mem_on_heap(1);
ws1.set_mem_off_heap(2);
ws1.set_cpu(3);
worker2Resources.put(new WorkerSlot("node1", 1), ws1);
WorkerResources ws2 = new WorkerResources();
ws2.set_mem_on_heap(4);
ws2.set_mem_off_heap(8);
ws2.set_cpu(12);
worker2Resources.put(new WorkerSlot("node2", 2), ws2);
WorkerResources ws3 = new WorkerResources();
ws3.set_mem_on_heap(16);
ws3.set_mem_off_heap(32);
ws3.set_cpu(48);
worker2Resources.put(new WorkerSlot("node3", 3), ws3);
}
private void makeTopoInfoWithSysWorker() {
makeTopoInfo();
List<Object> secondWorker = new ArrayList<Object>();
secondWorker.add("node2");
secondWorker.add(new Long(2));
exec2NodePort.put(makeExecutorId(2, 4), secondWorker);
}
private void makeTopoInfoWithMissingBeats() {
makeTopoInfo();
List<Object> thirdWorker = new ArrayList<Object>();
thirdWorker.add("node3");
thirdWorker.add(new Long(3));
exec2NodePort.put(makeExecutorId(5, 7), thirdWorker);
}
private List<WorkerSummary> checkWorkerStats(boolean includeSys, boolean userAuthorized, String filterSupervisor) {
List<WorkerSummary> summaries =
StatsUtil.aggWorkerStats("my-storm-id", "my-storm-name",
task2Component, beats, exec2NodePort, nodeHost, worker2Resources,
includeSys, userAuthorized, filterSupervisor, null);
for (WorkerSummary ws : summaries) {
String host = ws.get_host();
int port = ws.get_port();
Assert.assertEquals("my-storm-id", ws.get_topology_id());
Assert.assertEquals("my-storm-name", ws.get_topology_name());
boolean includeSupervisor = filterSupervisor == null || filterSupervisor.equals(host);
switch (port) {
case 1:
Assert.assertEquals("host1", ws.get_host());
Assert.assertEquals("node1", ws.get_supervisor_id());
Assert.assertEquals(1, ws.get_num_executors());
Assert.assertEquals(100, ws.get_uptime_secs());
Assert.assertEquals(1.0, ws.get_assigned_memonheap(), 0.001);
Assert.assertEquals(2.0, ws.get_assigned_memoffheap(), 0.001);
Assert.assertEquals(3.0, ws.get_assigned_cpu(), 0.001);
break;
case 2:
Assert.assertEquals("host2", ws.get_host());
Assert.assertEquals("node2", ws.get_supervisor_id());
Assert.assertEquals(1, ws.get_num_executors());
Assert.assertEquals(200, ws.get_uptime_secs());
Assert.assertEquals(4.0, ws.get_assigned_memonheap(), 0.001);
Assert.assertEquals(8.0, ws.get_assigned_memoffheap(), 0.001);
Assert.assertEquals(12.0, ws.get_assigned_cpu(), 0.001);
break;
case 3:
Assert.assertEquals("host3", ws.get_host());
Assert.assertEquals("node3", ws.get_supervisor_id());
Assert.assertEquals(1, ws.get_num_executors());
// no heartbeat for this one, should be 0
Assert.assertEquals(0, ws.get_uptime_secs());
Assert.assertEquals(16.0, ws.get_assigned_memonheap(), 0.001);
Assert.assertEquals(32.0, ws.get_assigned_memoffheap(), 0.001);
Assert.assertEquals(48.0, ws.get_assigned_cpu(), 0.001);
break;
}
}
// get the worker count back s.t. we can assert in each test function
return summaries;
}
private WorkerSummary getWorkerSummaryForPort(List<WorkerSummary> summaries, int port) {
//iterate of WorkerSummary and find the one with the port
for (WorkerSummary ws : summaries) {
if (ws.get_port() == port) {
return ws;
}
}
return null;
}
@Test
public void aggWorkerStats() {
makeTopoInfo();
List<WorkerSummary> summaries = checkWorkerStats(true /*include sys*/,
true /*user authorized*/,
null /*filter supervisor*/);
WorkerSummary ws = getWorkerSummaryForPort(summaries, 1);
Assert.assertEquals(1, ws.get_component_to_num_tasks().size());
Assert.assertEquals(1, ws.get_component_to_num_tasks().get("my-component").intValue());
Assert.assertEquals(1, summaries.size());
}
@Test
public void aggWorkerStatsWithSystemComponents() {
makeTopoInfoWithSysWorker();
List<WorkerSummary> summaries = checkWorkerStats(true /*include sys*/,
true /*user authorized*/,
null /*filter supervisor*/);
WorkerSummary ws = getWorkerSummaryForPort(summaries, 2);
// since we made sys components visible, the component map has all system components
Assert.assertEquals(3, ws.get_component_to_num_tasks().size());
Assert.assertEquals(1, ws.get_component_to_num_tasks().get("__sys1").intValue());
Assert.assertEquals(1, ws.get_component_to_num_tasks().get("__sys2").intValue());
Assert.assertEquals(1, ws.get_component_to_num_tasks().get("__sys3").intValue());
Assert.assertEquals(2, summaries.size());
}
@Test
public void aggWorkerStatsWithHiddenSystemComponents() {
makeTopoInfoWithSysWorker();
List<WorkerSummary> summaries = checkWorkerStats(false /*DON'T include sys*/,
true /*user authorized*/,
null /*filter supervisor*/);
WorkerSummary ws1 = getWorkerSummaryForPort(summaries, 1);
WorkerSummary ws2 = getWorkerSummaryForPort(summaries, 2);
Assert.assertEquals(1, ws1.get_component_to_num_tasks().size());
// since we made sys components hidden, the component map is empty for this worker
Assert.assertEquals(0, ws2.get_component_to_num_tasks().size());
Assert.assertEquals(2, summaries.size());
}
@Test
public void aggWorkerStatsForUnauthorizedUser() {
makeTopoInfoWithSysWorker();
List<WorkerSummary> summaries = checkWorkerStats(true /*include sys (should not matter)*/,
false /*user NOT authorized*/,
null /*filter supervisor*/);
WorkerSummary ws1 = getWorkerSummaryForPort(summaries, 1);
WorkerSummary ws2 = getWorkerSummaryForPort(summaries, 2);
// since we made user not authorized, component map is empty
Assert.assertEquals(0, ws1.get_component_to_num_tasks().size());
Assert.assertEquals(0, ws2.get_component_to_num_tasks().size());
Assert.assertEquals(2, summaries.size());
}
@Test
public void aggWorkerStatsFilterSupervisor() {
makeTopoInfoWithMissingBeats();
List<WorkerSummary> summaries = checkWorkerStats(true /*include sys*/,
true /*user authorized*/,
"node3" /*filter supervisor*/);
WorkerSummary ws = getWorkerSummaryForPort(summaries, 3);
// only host3 should be returned given filter
Assert.assertEquals(2, ws.get_component_to_num_tasks().size());
Assert.assertEquals(2, ws.get_component_to_num_tasks().get("__sys4").intValue());
Assert.assertEquals(1, ws.get_component_to_num_tasks().get("my-component2").intValue());
Assert.assertEquals(1, summaries.size());
}
@Test
public void aggWorkerStatsFilterSupervisorAndHideSystemComponents() {
makeTopoInfoWithMissingBeats();
List<WorkerSummary> summaries = checkWorkerStats(false /*DON'T include sys*/,
true /*user authorized*/,
"node3" /*filter supervisor*/);
WorkerSummary ws = getWorkerSummaryForPort(summaries, 3);
// hidden sys component
Assert.assertEquals(1, ws.get_component_to_num_tasks().size());
Assert.assertEquals(1, ws.get_component_to_num_tasks().get("my-component2").intValue());
Assert.assertEquals(1, summaries.size());
}
}