blob: a1886be7fb36085b6d210951bcdd9f092529e337 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hama.HamaConfiguration;
/**
* A simple logger to handle the task-specific user logs.
*/
public class TaskLog {
private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
private static final File LOG_DIR = new File(
System.getProperty("hama.log.dir"), "tasklogs").getAbsoluteFile();
static {
if (!LOG_DIR.exists()) {
LOG_DIR.mkdirs();
}
}
/*
* Get LogFile by taskid and distinguish between log and error extension
*/
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
// TODO clean up the log path and type.
return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString()
+ ((filter == LogName.STDERR) ? ".err" : ".log"));
}
/*
* Get LogFile by stringPattern and distinguish between log and error
* extension
*/
public static File getLocalTaskLogFile(LogName filter, String stringPattern) {
// TODO clean up the log path and type.
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern(stringPattern);
return new File(LOG_DIR, "job_" + sdf.format(new Date()) + "/" + "local_"
+ sdf.format(new Date())
+ ((filter == LogName.STDERR) ? ".err" : ".log"));
}
/**
* The filter for userlogs.
*/
public static enum LogName {
/** Log on the stdout of the task. */
STDOUT("stdout"),
/** Log on the stderr of the task. */
STDERR("stderr"),
/** Log on the map-reduce system logs of the task. */
SYSLOG("syslog"),
/** The java profiler information. */
PROFILE("profile.out"),
/** Log the debug script's stdout */
DEBUGOUT("debugout");
private String prefix;
private LogName(String prefix) {
this.prefix = prefix;
}
@Override
public String toString() {
return prefix;
}
}
private static class TaskLogsPurgeFilter implements FileFilter {
long purgeTimeStamp;
TaskLogsPurgeFilter(long purgeTimeStamp) {
this.purgeTimeStamp = purgeTimeStamp;
}
@Override
public boolean accept(File file) {
LOG.debug("PurgeFilter - file: " + file + ", mtime: "
+ file.lastModified() + ", purge: " + purgeTimeStamp);
return file.lastModified() < purgeTimeStamp;
}
}
/**
* Purge old user logs.
*
* @throws IOException
*/
public static synchronized void cleanup(int logsRetainHours)
throws IOException {
// Purge logs of tasks on this tasktracker if their
// mtime has exceeded "bsp.task.log.retain" hours
long purgeTimeStamp = System.currentTimeMillis()
- (logsRetainHours * 60L * 60 * 1000);
File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
purgeTimeStamp));
if (oldTaskLogs != null) {
for (File oldTaskLog : oldTaskLogs) {
FileUtil.fullyDelete(oldTaskLog);
}
}
}
static class Reader extends InputStream {
private long bytesRemaining;
private FileInputStream file;
/**
* Read a log file from start to end positions. The offsets may be negative,
* in which case they are relative to the end of the file. For example,
* Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind,
* -4197, -1) is the last 4196 bytes.
*
* @param taskid the id of the task to read the log file for
* @param kind the kind of log to read
* @param pStart the offset to read from (negative is relative to tail)
* @param pEnd the offset to read upto (negative is relative to tail)
* @throws IOException
*/
public Reader(TaskAttemptID taskid, LogName kind, long pStart, long pEnd)
throws IOException {
long start = pStart;
long end = pEnd;
// find the right log file
File filename = getTaskLogFile(taskid, kind);
// calculate the start and stop
long size = filename.length();
if (start < 0) {
start += size + 1;
}
if (end < 0) {
end += size + 1;
}
start = Math.max(0, Math.min(start, size));
end = Math.max(0, Math.min(end, size));
bytesRemaining = end - start;
file = new FileInputStream(filename);
// skip upto start
long pos = 0;
while (pos < start) {
long result = file.skip(start - pos);
if (result < 0) {
bytesRemaining = 0;
break;
}
pos += result;
}
}
@Override
public int read() throws IOException {
int result = -1;
if (bytesRemaining > 0) {
bytesRemaining -= 1;
result = file.read();
}
return result;
}
@Override
public int read(byte[] buffer, int offset, int pLength) throws IOException {
int length = (int) Math.min(pLength, bytesRemaining);
int bytes = file.read(buffer, offset, length);
if (bytes > 0) {
bytesRemaining -= bytes;
}
return bytes;
}
@Override
public int available() throws IOException {
return (int) Math.min(bytesRemaining, file.available());
}
@Override
public void close() throws IOException {
file.close();
}
}
private static final String bashCommand = "bash";
private static final String tailCommand = "tail";
/**
* Get the desired maximum length of task's logs.
*
* @param conf the job to look in
* @return the number of bytes to cap the log files at
*/
public static long getTaskLogLength(HamaConfiguration conf) {
return conf.getLong("bsp.userlog.limit.kb", 100) * 1024;
}
/**
* Wrap a command in a shell to capture stdout and stderr to files. If the
* tailLength is 0, the entire output will be saved.
*
* @param cmd The command and the arguments that should be run
* @param stdoutFilename The filename that stdout should be saved to
* @param stderrFilename The filename that stderr should be saved to
* @param tailLength The length of the tail to be saved.
* @return the modified command that should be run
*/
public static List<String> captureOutAndError(List<String> cmd,
File stdoutFilename, File stderrFilename, long tailLength)
throws IOException {
return captureOutAndError(null, cmd, stdoutFilename, stderrFilename,
tailLength);
}
/**
* Wrap a command in a shell to capture stdout and stderr to files. Setup
* commands such as setting memory limit can be passed which will be executed
* before exec. If the tailLength is 0, the entire output will be saved.
*
* @param setup The setup commands for the execed process.
* @param cmd The command and the arguments that should be run
* @param stdoutFilename The filename that stdout should be saved to
* @param stderrFilename The filename that stderr should be saved to
* @param tailLength The length of the tail to be saved.
* @return the modified command that should be run
*/
public static List<String> captureOutAndError(List<String> setup,
List<String> cmd, File stdoutFilename, File stderrFilename,
long tailLength) throws IOException {
String stdout = FileUtil.makeShellPath(stdoutFilename);
String stderr = FileUtil.makeShellPath(stderrFilename);
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
StringBuffer mergedCmd = new StringBuffer();
if (setup != null && setup.size() > 0) {
mergedCmd.append(addCommand(setup, false));
mergedCmd.append(";");
}
if (tailLength > 0) {
mergedCmd.append("(");
} else {
mergedCmd.append("exec ");
}
mergedCmd.append(addCommand(cmd, true));
mergedCmd.append(" < /dev/null ");
if (tailLength > 0) {
mergedCmd.append(" | ");
mergedCmd.append(tailCommand);
mergedCmd.append(" -c ");
mergedCmd.append(tailLength);
mergedCmd.append(" > ");
mergedCmd.append(stdout);
mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
mergedCmd.append(tailCommand);
mergedCmd.append(" -c ");
mergedCmd.append(tailLength);
mergedCmd.append(" > ");
mergedCmd.append(stderr);
mergedCmd.append(" ; exit $PIPESTATUS");
} else {
mergedCmd.append(" 1> ");
mergedCmd.append(stdout);
mergedCmd.append(" 2> ");
mergedCmd.append(stderr);
}
result.add(mergedCmd.toString());
return result;
}
public static List<String> captureOutAndErrorTee(List<String> setup,
List<String> cmd, File stdoutFilename, File stderrFilename,
long tailLength) throws IOException {
String stdout = FileUtil.makeShellPath(stdoutFilename);
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
StringBuilder mergedCmd = new StringBuilder();
mergedCmd.append(addCommand(cmd, true));
mergedCmd.append(" 2>&1 | tee ").append(stdout);
result.add(mergedCmd.toString());
return result;
}
/**
* Add quotes to each of the command strings and return as a single string
*
* @param cmd The command to be quoted
* @param pIsExecutable makes shell path if the first argument is executable
* @return returns The quoted string.
* @throws IOException
*/
public static String addCommand(List<String> cmd, boolean pIsExecutable)
throws IOException {
boolean isExecutable = pIsExecutable;
StringBuffer command = new StringBuffer();
for (String s : cmd) {
command.append('\'');
if (isExecutable) {
// the executable name needs to be expressed as a shell path for the
// shell to find it.
command.append(FileUtil.makeShellPath(new File(s)));
isExecutable = false;
} else {
command.append(s);
}
command.append('\'');
command.append(" ");
}
return command.toString();
}
/**
* Wrap a command in a shell to capture debug script's stdout and stderr to
* debugout.
*
* @param cmd The command and the arguments that should be run
* @param debugoutFilename The filename that stdout and stderr should be saved
* to.
* @return the modified command that should be run
* @throws IOException
*/
public static List<String> captureDebugOut(List<String> cmd,
File debugoutFilename) throws IOException {
String debugout = FileUtil.makeShellPath(debugoutFilename);
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
StringBuffer mergedCmd = new StringBuffer();
mergedCmd.append("exec ");
boolean isExecutable = true;
for (String s : cmd) {
if (isExecutable) {
// the executable name needs to be expressed as a shell path for the
// shell to find it.
mergedCmd.append(FileUtil.makeShellPath(new File(s)));
isExecutable = false;
} else {
mergedCmd.append(s);
}
mergedCmd.append(" ");
}
mergedCmd.append(" < /dev/null ");
mergedCmd.append(" >");
mergedCmd.append(debugout);
mergedCmd.append(" 2>&1 ");
result.add(mergedCmd.toString());
return result;
}
/**
* Get the desired maximum length of task's logs.
*
* @param conf the job to look in
* @return the number of bytes to cap the log files at
*/
public static long getTaskLogLength(Configuration conf) {
return conf.getLong("bsp.userlog.limit.kb", 100) * 1024;
}
} // TaskLog