blob: bc02d6cd6617afdb412bf8cb0262c783b1c89f88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.helix;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.helix.common.ZkTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadLeakageChecker {
private final static Logger LOG = LoggerFactory.getLogger(ThreadLeakageChecker.class);
private static ThreadGroup getRootThreadGroup() {
ThreadGroup candidate = Thread.currentThread().getThreadGroup();
while (candidate.getParent() != null) {
candidate = candidate.getParent();
}
return candidate;
}
private static List<Thread> getAllThreads() {
ThreadGroup rootThreadGroup = getRootThreadGroup();
Thread[] threads = new Thread[32];
int count = rootThreadGroup.enumerate(threads);
while (count == threads.length) {
threads = new Thread[threads.length * 2];
count = rootThreadGroup.enumerate(threads);
}
return Arrays.asList(Arrays.copyOf(threads, count));
}
private static final String[] ZKSERVER_THRD_PATTERN =
{"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
private static final String[] ZKSESSION_THRD_PATTERN =
new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback", "-EventThread", "-SendThread"};
private static final String[] FORKJOIN_THRD_PATTERN = new String[]{"ForkJoinPool"};
private static final String[] TIMER_THRD_PATTERN = new String[]{"time"};
private static final String[] TASKSTATEMODEL_THRD_PATTERN = new String[]{"TaskStateModel"};
/*
* The two threshold -- warning and limit, are mostly empirical.
*
* ZkServer, current version has only 4 threads. In case later version use more, we the limit to 100.
* The reasoning is that these ZkServer threads are not deemed as leaking no matter how much they have.
*
* ZkSession is the ZkClient and native Zookeeper client we have. ZkTestBase has 12 at starting up time.
* Thus, if there is more than that, it is the test code leaking ZkClient.
*
* ForkJoin is created by using parallel stream or similar Java features. This is out of our control.
* Similar to ZkServer. The limit is to 100 while keep a small _warningLimit.
*
* Timer should not happen. Setting limit to 2 not 0 mostly because even when you cancel the timer
* thread, it may take some not deterministic time for it to go away. So give it some slack here
*
* Also note, this ThreadLeakage checker depends on the fact that tests are running sequentially.
* Otherwise, the report is not going to be accurate.
*/
private static enum ThreadCategory {
ZkServer("zookeeper server threads", 4, 100, ZKSERVER_THRD_PATTERN),
ZkSession("zkclient/zooKeeper session threads", 12, 12, ZKSESSION_THRD_PATTERN),
ForkJoin("fork join pool threads", 2, 100, FORKJOIN_THRD_PATTERN),
Timer("timer threads", 0, 2, TIMER_THRD_PATTERN),
TaskStateModel("TaskStateModel threads", 0, 0, TASKSTATEMODEL_THRD_PATTERN),
Other("Other threads", 0, 2, new String[]{""});
private String _description;
private List<String> _pattern;
private int _warningLimit;
private int _limit;
public String getDescription() {
return _description;
}
public Predicate<String> getMatchPred() {
if (this.name() != ThreadCategory.Other.name()) {
Predicate<String> pred = target -> {
for (String p : _pattern) {
if (target.toLowerCase().contains(p.toLowerCase())) {
return true;
}
}
return false;
};
return pred;
}
List<Predicate<String>> predicateList = new ArrayList<>();
for (ThreadCategory threadCategory : ThreadCategory.values()) {
if (threadCategory == ThreadCategory.Other) {
continue;
}
predicateList.add(threadCategory.getMatchPred());
}
Predicate<String> pred = target -> {
for (Predicate<String> p : predicateList) {
if (p.test(target)) {
return false;
}
}
return true;
};
return pred;
}
public int getWarningLimit() {
return _warningLimit;
}
public int getLimit() {
return _limit;
}
private ThreadCategory(String description, int warningLimit, int limit, String[] patterns) {
_description = description;
_pattern = Arrays.asList(patterns);
_warningLimit = warningLimit;
_limit = limit;
}
}
public static boolean afterClassCheck(String classname) {
ZkTestBase.reportPhysicalMemory();
// step 1: get all active threads
List<Thread> threads = getAllThreads();
LOG.info(classname + " has active threads cnt:" + threads.size());
// step 2: categorize threads
Map<String, List<Thread>> threadByName = null;
Map<ThreadCategory, Integer> threadByCnt = new HashMap<>();
Map<ThreadCategory, Set<Thread>> threadByCat = new HashMap<>();
try {
threadByName = threads.
stream().
filter(p -> p.getThreadGroup() != null && p.getThreadGroup().getName() != null
&& ! "system".equals(p.getThreadGroup().getName())).
collect(Collectors.groupingBy(p -> p.getName()));
} catch (Exception e) {
LOG.error("Filtering thread failure with exception:", e);
}
threadByName.entrySet().stream().forEach(entry -> {
String key = entry.getKey(); // thread name
Arrays.asList(ThreadCategory.values()).stream().forEach(category -> {
if (category.getMatchPred().test(key)) {
Integer count = threadByCnt.containsKey(category) ? threadByCnt.get(category) : 0;
threadByCnt.put(category, count + entry.getValue().size());
Set<Thread> thisSet = threadByCat.getOrDefault(category, new HashSet<>());
thisSet.addAll(entry.getValue());
threadByCat.put(category, thisSet);
}
});
});
// todo: We should make the following System.out as LOG.INfO once we achieve 0 thread leakage.
// todo: also the calling point of this method would fail the test
// step 3: enforce checking policy
boolean checkStatus = true;
for (ThreadCategory threadCategory : ThreadCategory.values()) {
int limit = threadCategory.getLimit();
int warningLimit = threadCategory.getWarningLimit();
Integer categoryThreadCnt = threadByCnt.get(threadCategory);
if (categoryThreadCnt != null) {
boolean dumpThread = false;
if (categoryThreadCnt > limit) {
checkStatus = false;
LOG.info(
"Failure " + threadCategory.getDescription() + " has " + categoryThreadCnt + " thread");
dumpThread = true;
} else if (categoryThreadCnt > warningLimit) {
LOG.info(
"Warning " + threadCategory.getDescription() + " has " + categoryThreadCnt + " thread");
dumpThread = true;
} else {
LOG.info(threadCategory.getDescription() + " has " + categoryThreadCnt + " thread");
}
if (!dumpThread) {
continue;
}
// print first 100 thread names
int i = 0;
for (Thread t : threadByCat.get(threadCategory)) {
LOG.debug(i + " thread:" + t.getName());
i++;
if (i == 100) {
LOG.debug(" skipping the rest");
break;
}
}
}
}
return checkStatus;
}
}