blob: 849cbf925692d36ddebf670663fd608a2cb62199 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.invariants;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This policy checks at every invocation that a given set of invariants
* (specified in a file) are respected over QueueMetrics and JvmMetrics. The
* file may contain arbitrary (Javascrip) boolean expression over the metrics
* variables.
*
* The right set of invariants depends on the deployment environment, a large
* number of complex invariant can make this check expensive.
*
* The MetricsInvariantChecker can be configured to throw a RuntimeException or
* simlpy warn in the logs if an invariant is not respected.
*/
public class MetricsInvariantChecker extends InvariantsChecker {
private static final Logger LOG =
LoggerFactory.getLogger(MetricsInvariantChecker.class);
public static final String INVARIANTS_FILE =
"yarn.resourcemanager.invariant-checker.file";
private MetricsSystem metricsSystem;
private MetricsCollectorImpl collector;
private SimpleBindings bindings;
private ScriptEngineManager manager;
private Compilable scriptEngine;
private String invariantFile;
private Map<String, CompiledScript> invariants;
private CompiledScript combinedInvariants;
// set of metrics we monitor
private QueueMetrics queueMetrics;
private JvmMetrics jvmMetrics;
@Override
public void init(Configuration config, RMContext rmContext,
ResourceScheduler preemptableResourceScheduler) {
super.init(config, rmContext, preemptableResourceScheduler);
this.metricsSystem = DefaultMetricsSystem.instance();
this.queueMetrics =
QueueMetrics.forQueue(metricsSystem, "root", null, false, getConf());
this.jvmMetrics = (JvmMetrics) metricsSystem.getSource("JvmMetrics");
// at first collect all metrics
collector = new MetricsCollectorImpl();
queueMetrics.getMetrics(collector, true);
jvmMetrics.getMetrics(collector, true);
// prepare bindings and evaluation engine
this.bindings = new SimpleBindings();
this.manager = new ScriptEngineManager();
this.scriptEngine = (Compilable) manager.getEngineByName("JavaScript");
// load metrics invariant from file
this.invariantFile = getConf().get(MetricsInvariantChecker.INVARIANTS_FILE);
this.invariants = new HashMap<>();
// preload all bindings
queueMetrics.getMetrics(collector, true);
jvmMetrics.getMetrics(collector, true);
for (MetricsRecord record : collector.getRecords()) {
for (AbstractMetric am : record.metrics()) {
bindings.put(am.name().replace(' ', '_'), am.value());
}
}
StringBuilder sb = new StringBuilder();
try {
List<String> tempInv =
Files.readLines(new File(invariantFile), Charsets.UTF_8);
boolean first = true;
// precompile individual invariants
for (String inv : tempInv) {
if(first) {
first = false;
} else {
sb.append("&&");
}
invariants.put(inv, scriptEngine.compile(inv));
sb.append(" (");
sb.append(inv);
sb.append(") ");
}
// create a single large combined invariant for speed of checking
combinedInvariants = scriptEngine.compile(sb.toString());
} catch (IOException e) {
throw new RuntimeException(
"Error loading invariant file: " + e.getMessage());
} catch (ScriptException e) {
throw new RuntimeException("Error compiling invariant " + e.getMessage());
}
}
@Override
public void editSchedule() {
// grab all changed metrics and update bindings
collector.clear();
queueMetrics.getMetrics(collector, false);
jvmMetrics.getMetrics(collector, false);
for (MetricsRecord record : collector.getRecords()) {
for (AbstractMetric am : record.metrics()) {
bindings.put(am.name().replace(' ', '_'), am.value());
}
}
// evaluate all invariants with new bindings
try {
// fastpath check all invariants at once (much faster)
boolean allInvHold = (boolean) combinedInvariants.eval(bindings);
// if any fails, check individually to produce more insightful log
if (!allInvHold) {
for (Map.Entry<String, CompiledScript> e : invariants.entrySet()) {
boolean invariantsHold = (boolean) e.getValue().eval(bindings);
if (!invariantsHold) {
// filter bindings to produce minimal set
Map<String, Object> matchingBindings =
extractMatchingBindings(e.getKey(), bindings);
logOrThrow("Invariant \"" + e.getKey()
+ "\" is NOT holding, with bindings: " + matchingBindings);
}
}
}
} catch (ScriptException e) {
logOrThrow(e.getMessage());
}
}
private static Map<String, Object> extractMatchingBindings(String inv,
SimpleBindings allBindings) {
Map<String, Object> matchingBindings = new HashMap<>();
for (Map.Entry<String, Object> s : allBindings.entrySet()) {
if (inv.contains(s.getKey())) {
matchingBindings.put(s.getKey(), s.getValue());
}
}
return matchingBindings;
}
}