blob: 6f3e9fec1ea669f0c7a4074014c1a7821a7a9665 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.tez.common.TezContainerLogAppender;
import org.apache.tez.dag.api.TezConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT;
public class TezThreadDumpHelper {
public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper();
private long duration = 0L;
private Path basePath = null;
private FileSystem fs = null;
private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean();
private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class);
private ScheduledExecutorService periodicThreadDumpServiceExecutor;
private TezThreadDumpHelper(long duration, Configuration conf) throws IOException {
this.duration = duration;
Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME);
if (appender instanceof TezContainerLogAppender) {
this.basePath = new Path(((TezContainerLogAppender) appender).getContainerLogDir());
this.fs = FileSystem.getLocal(conf);
} else {
// Fallback, if it is any other appender or if none is configured.
this.basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.fs = this.basePath.getFileSystem(conf);
}
LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " +
"path: {}", duration, basePath);
}
public TezThreadDumpHelper() {
}
public static TezThreadDumpHelper getInstance(Configuration conf) {
long periodicThreadDumpFrequency =
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
if (periodicThreadDumpFrequency > 0) {
try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
} catch (IOException e) {
LOG.warn("Can not initialize periodic thread dump service", e);
}
}
return NOOP_TEZ_THREAD_DUMP_HELPER;
}
public TezThreadDumpHelper start(String name) {
periodicThreadDumpServiceExecutor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PeriodicThreadDumpService{" + name + "} #%d")
.build());
Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs);
periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS);
return this;
}
public void stop() {
if (periodicThreadDumpServiceExecutor != null) {
periodicThreadDumpServiceExecutor.shutdown();
try {
if (!periodicThreadDumpServiceExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
periodicThreadDumpServiceExecutor.shutdownNow();
}
} catch (InterruptedException ignored) {
// Ignore interrupt, will attempt a final shutdown below.
}
periodicThreadDumpServiceExecutor.shutdownNow();
periodicThreadDumpServiceExecutor = null;
}
}
private static class ThreadDumpCollector implements Runnable {
private final Path path;
private final String name;
private final FileSystem fs;
ThreadDumpCollector(Path path, String name, FileSystem fs) {
this.path = path;
this.fs = fs;
this.name = name;
}
@Override
public void run() {
if (!Thread.interrupted()) {
try (FSDataOutputStream fsStream = fs.create(
new Path(path, name + "_" + System.currentTimeMillis() + ".jstack"));
PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) {
printThreadInfo(printStream, name);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public synchronized void printThreadInfo(PrintStream stream, String title) {
boolean contention = THREAD_BEAN.isThreadContentionMonitoringEnabled();
long[] threadIds = THREAD_BEAN.getAllThreadIds();
stream.println("Process Thread Dump: " + title);
stream.println(threadIds.length + " active threads");
for (long tid : threadIds) {
ThreadInfo info = THREAD_BEAN.getThreadInfo(tid, Integer.MAX_VALUE);
if (info == null) {
stream.println(" Inactive");
continue;
}
stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
Thread.State state = info.getThreadState();
stream.println(" State: " + state);
stream.println(" Blocked count: " + info.getBlockedCount());
stream.println(" Waited count: " + info.getWaitedCount());
if (contention) {
stream.println(" Blocked time: " + info.getBlockedTime());
stream.println(" Waited time: " + info.getWaitedTime());
}
if (state == Thread.State.WAITING) {
stream.println(" Waiting on " + info.getLockName());
} else if (state == Thread.State.BLOCKED) {
stream.println(" Blocked on " + info.getLockName());
stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
}
stream.println(" Stack:");
for (StackTraceElement frame : info.getStackTrace()) {
stream.println(" " + frame.toString());
}
}
stream.flush();
}
private String getTaskName(long id, String taskName) {
if (taskName == null) {
return Long.toString(id);
}
return id + " (" + taskName + ")";
}
}
private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {
@Override
public TezThreadDumpHelper start(String name) {
// Do Nothing
return this;
}
@Override
public void stop() {
// Do Nothing
}
}
}