[STORM-3492] Add config to prevent good supervisor with bad workers from going to blacklist when necessary
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e3c33bd..1a17e52 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -343,6 +343,7 @@
blacklist.scheduler.resume.time.secs: 1800
blacklist.scheduler.reporter: "org.apache.storm.scheduler.blacklist.reporters.LogReporter"
blacklist.scheduler.strategy: "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy"
+blacklist.scheduler.assume.supervisor.bad.based.on.bad.slot: true
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 26cacff..820d80f 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -138,6 +138,17 @@
public static final String BLACKLIST_SCHEDULER_STRATEGY = "blacklist.scheduler.strategy";
/**
+ * Whether {@link org.apache.storm.scheduler.blacklist.BlacklistScheduler} will assume the supervisor is bad
+ * based on bad slots or not.
+ * A bad slot indicates the situation where the nimbus doesn't receive heartbeat from the worker in time,
+ * it's hard to differentiate if it's because of the supervisor node or the worker itself.
+ * If this is set to true, the scheduler will consider a supervisor is bad when seeing bad slots in it.
+ * Otherwise, the scheduler will assume a supervisor is bad only when it does not receive supervisor heartbeat in time.
+ */
+ @IsBoolean
+ public static final String BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT = "blacklist.scheduler.assume.supervisor.bad.based.on.bad.slot";
+
+ /**
* Whether we want to display all the resource capacity and scheduled usage on the UI page. You MUST have this variable set if you are
* using any kind of resource-related scheduler.
* <p/>
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index a2bedcb..33c0e04 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -55,6 +55,7 @@
protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
protected int windowSize;
protected volatile Set<String> blacklistedSupervisorIds; // supervisor ids
+ private boolean blacklistOnBadSlots;
private Map<String, Object> conf;
public BlacklistScheduler(IScheduler underlyingScheduler, StormMetricsRegistry metricsRegistry) {
@@ -90,6 +91,7 @@
badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
cachedSupervisors = new HashMap<>();
blacklistedSupervisorIds = new HashSet<>();
+ blacklistOnBadSlots = ObjectReader.getBoolean(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT), true);
//nimbus:num-blacklisted-supervisor + non-blacklisted supervisor = nimbus:num-supervisors
metricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> blacklistedSupervisorIds.size());
@@ -137,15 +139,16 @@
String key = entry.getKey();
SupervisorDetails supervisorDetails = entry.getValue();
if (cachedSupervisors.containsKey(key)) {
- Set<Integer> badSlots = badSlots(supervisorDetails, key);
- if (badSlots.size() > 0) { //supervisor contains bad slots
- badSupervisors.put(key, badSlots);
+ if (blacklistOnBadSlots) {
+ Set<Integer> badSlots = badSlots(supervisorDetails, key);
+ if (badSlots.size() > 0) { //supervisor contains bad slots
+ badSupervisors.put(key, badSlots);
+ }
}
} else {
cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache
}
}
-
badSupervisorsToleranceSlidingWindow.add(badSupervisors);
}
@@ -160,7 +163,6 @@
allPorts.addAll(cachedSupervisorPorts);
cachedSupervisors.put(supervisorKey, allPorts);
}
-
Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, supervisorPorts);
return badSlots;
}
@@ -198,7 +200,8 @@
for (String supervisor : supervisors) {
int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
Set<Integer> slots = item.get(supervisor);
- if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all of its slots matched the cached supervisor
+ // treat supervisor as bad only if all of its slots matched the cached supervisor
+ if (slots.equals(cachedSupervisors.get(supervisor))) {
// track how many times a cached supervisor has been marked bad
supervisorCountMap.put(supervisor, supervisorCount + 1);
}