blob: 52fca86f73d1d1fa1db5aad81027f7bd3a2af877 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.mortbay.jetty.nio.SelectChannelConnector;
/**
* Class that monitors for a certain class of Jetty bug known to
* affect TaskTrackers. In this type of bug, the Jetty selector
* thread starts spinning and using ~100% CPU while no actual
* HTTP content is being served. Given that this bug has been
* active in Jetty/JDK for a long time with no resolution in site,
* this class provides a temporary workaround.
*
* Upon detecting the selector thread spinning, it simply exits the
* JVM with a Fatal message.
*/
class JettyBugMonitor extends Thread {
private final static Log LOG = LogFactory.getLog(
JettyBugMonitor.class);
private static final ThreadMXBean threadBean =
ManagementFactory.getThreadMXBean();
private static final String CHECK_ENABLED_KEY =
"mapred.tasktracker.jetty.cpu.check.enabled";
private static final boolean CHECK_ENABLED_DEFAULT = true;
static final String CHECK_INTERVAL_KEY =
"mapred.tasktracker.jetty.cpu.check.interval";
private static final long CHECK_INTERVAL_DEFAULT = 15*1000;
private long checkInterval;
private static final String WARN_THRESHOLD_KEY =
"mapred.tasktracker.jetty.cpu.threshold.warn";
private static final float WARN_THRESHOLD_DEFAULT = 0.50f;
private float warnThreshold;
private static final String FATAL_THRESHOLD_KEY =
"mapred.tasktracker.jetty.cpu.threshold.fatal";
private static final float FATAL_THRESHOLD_DEFAULT = 0.90f;
private float fatalThreshold;
private boolean stopping = false;
/**
* Create the monitoring thread.
* @return null if thread CPU monitoring is not supported
*/
public static JettyBugMonitor create(Configuration conf) {
if (!conf.getBoolean(CHECK_ENABLED_KEY, CHECK_ENABLED_DEFAULT)) {
return null;
}
if (!threadBean.isThreadCpuTimeSupported()) {
LOG.warn("Not starting monitor for Jetty bug since thread CPU time " +
"measurement is not supported by this JVM");
return null;
}
return new JettyBugMonitor(conf);
}
JettyBugMonitor(Configuration conf) {
setName("Monitor for Jetty bugs");
setDaemon(true);
this.warnThreshold = conf.getFloat(
WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
this.fatalThreshold = conf.getFloat(
FATAL_THRESHOLD_KEY, FATAL_THRESHOLD_DEFAULT);
this.checkInterval = conf.getLong(
CHECK_INTERVAL_KEY, CHECK_INTERVAL_DEFAULT);
}
@Override
public void run() {
try {
doRun();
} catch (InterruptedException ie) {
if (!stopping) {
LOG.warn("Jetty monitor unexpectedly interrupted", ie);
}
} catch (Throwable t) {
LOG.error("Jetty bug monitor failed", t);
}
LOG.debug("JettyBugMonitor shutting down");
}
private void doRun() throws InterruptedException {
List<Long> tids = waitForJettyThreads();
if (tids.isEmpty()) {
LOG.warn("Could not locate Jetty selector threads");
return;
}
while (true) {
try {
monitorThreads(tids);
} catch (ThreadNotRunningException tnre) {
return;
}
}
}
/**
* Monitor the given list of threads, summing their CPU usage.
* If the usage exceeds the configured threshold, aborts the JVM.
* @param tids thread ids to monitor
* @throws InterruptedException if interrupted
* @throws ThreadNotRunningException if one of the threads is no longer
* running
*/
private void monitorThreads(List<Long> tids)
throws InterruptedException, ThreadNotRunningException {
long timeBefore = System.nanoTime();
long usageBefore = getCpuUsageNanos(tids);
while (true) {
Thread.sleep(checkInterval);
long usageAfter = getCpuUsageNanos(tids);
long timeAfter = System.nanoTime();
long delta = usageAfter - usageBefore;
double percentCpu = (double)delta / (timeAfter - timeBefore);
String msg = String.format("Jetty CPU usage: %.1f%%", percentCpu * 100);
if (percentCpu > fatalThreshold) {
LOG.fatal(
"************************************************************\n" +
msg + ". This is greater than the fatal threshold " +
FATAL_THRESHOLD_KEY + ". Aborting JVM.\n" +
"************************************************************");
doAbort();
} else if (percentCpu > warnThreshold) {
LOG.warn(msg);
} else if (LOG.isDebugEnabled()) {
LOG.debug(msg);
}
usageBefore = usageAfter;
timeBefore = timeAfter;
}
}
protected void doAbort() {
Runtime.getRuntime().exit(1);
}
/**
* Wait for jetty selector threads to start.
* @return the list of thread IDs
* @throws InterruptedException if interrupted
*/
protected List<Long> waitForJettyThreads() throws InterruptedException {
List<Long> tids = new ArrayList<Long>();
int i = 0;
while (tids.isEmpty() & i++ < 30) {
Thread.sleep(1000);
tids = getJettyThreadIds();
}
return tids;
}
private static long getCpuUsageNanos(List<Long> tids)
throws ThreadNotRunningException {
long total = 0;
for (long tid : tids) {
long time = threadBean.getThreadCpuTime(tid);
if (time == -1) {
LOG.warn("Unable to monitor CPU usage for thread: " + tid);
throw new ThreadNotRunningException();
}
total += time;
}
return total;
}
static List<Long> getJettyThreadIds() {
List<Long> tids = new ArrayList<Long>();
long[] threadIds = threadBean.getAllThreadIds();
for (long tid : threadIds) {
if (isJettySelectorThread(tid)) {
tids.add(tid);
}
}
return tids;
}
/**
* @return true if the given thread ID appears to be a Jetty selector thread
* based on its stack trace
*/
private static boolean isJettySelectorThread(long tid) {
ThreadInfo info = threadBean.getThreadInfo(tid, 20);
for (StackTraceElement stack : info.getStackTrace()) {
// compare class names instead of classses, since
// jetty uses a different classloader
if (SelectChannelConnector.class.getName().equals(
stack.getClassName())) {
LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
"is a Jetty selector thread.");
return true;
}
}
LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
"is not a jetty thread");
return false;
}
private static class ThreadNotRunningException extends Exception {
private static final long serialVersionUID = 1L;
}
public void shutdown() {
this.stopping = true;
this.interrupt();
}
}