| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hama.bsp; |
| |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hama.HamaConfiguration; |
| |
| /** |
| * A simple logger to handle the task-specific user logs. |
| */ |
| public class TaskLog { |
| private static final Log LOG = LogFactory.getLog(TaskLog.class.getName()); |
| |
| private static final File LOG_DIR = new File( |
| System.getProperty("hama.log.dir"), "tasklogs").getAbsoluteFile(); |
| |
| static { |
| if (!LOG_DIR.exists()) { |
| LOG_DIR.mkdirs(); |
| } |
| } |
| |
| /* |
| * Get LogFile by taskid and distinguish between log and error extension |
| */ |
| public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) { |
| // TODO clean up the log path and type. |
| return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString() |
| + ((filter == LogName.STDERR) ? ".err" : ".log")); |
| } |
| |
| /* |
| * Get LogFile by stringPattern and distinguish between log and error |
| * extension |
| */ |
| public static File getLocalTaskLogFile(LogName filter, String stringPattern) { |
| // TODO clean up the log path and type. |
| SimpleDateFormat sdf = new SimpleDateFormat(); |
| sdf.applyPattern(stringPattern); |
| return new File(LOG_DIR, "job_" + sdf.format(new Date()) + "/" + "local_" |
| + sdf.format(new Date()) |
| + ((filter == LogName.STDERR) ? ".err" : ".log")); |
| } |
| |
| /** |
| * The filter for userlogs. |
| */ |
| public static enum LogName { |
| /** Log on the stdout of the task. */ |
| STDOUT("stdout"), |
| |
| /** Log on the stderr of the task. */ |
| STDERR("stderr"), |
| |
| /** Log on the map-reduce system logs of the task. */ |
| SYSLOG("syslog"), |
| |
| /** The java profiler information. */ |
| PROFILE("profile.out"), |
| |
| /** Log the debug script's stdout */ |
| DEBUGOUT("debugout"); |
| |
| private String prefix; |
| |
| private LogName(String prefix) { |
| this.prefix = prefix; |
| } |
| |
| @Override |
| public String toString() { |
| return prefix; |
| } |
| } |
| |
| private static class TaskLogsPurgeFilter implements FileFilter { |
| long purgeTimeStamp; |
| |
| TaskLogsPurgeFilter(long purgeTimeStamp) { |
| this.purgeTimeStamp = purgeTimeStamp; |
| } |
| |
| @Override |
| public boolean accept(File file) { |
| LOG.debug("PurgeFilter - file: " + file + ", mtime: " |
| + file.lastModified() + ", purge: " + purgeTimeStamp); |
| return file.lastModified() < purgeTimeStamp; |
| } |
| } |
| |
| /** |
| * Purge old user logs. |
| * |
| * @throws IOException |
| */ |
| public static synchronized void cleanup(int logsRetainHours) |
| throws IOException { |
| // Purge logs of tasks on this tasktracker if their |
| // mtime has exceeded "bsp.task.log.retain" hours |
| long purgeTimeStamp = System.currentTimeMillis() |
| - (logsRetainHours * 60L * 60 * 1000); |
| File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter( |
| purgeTimeStamp)); |
| if (oldTaskLogs != null) { |
| for (File oldTaskLog : oldTaskLogs) { |
| FileUtil.fullyDelete(oldTaskLog); |
| } |
| } |
| } |
| |
| static class Reader extends InputStream { |
| private long bytesRemaining; |
| private FileInputStream file; |
| |
| /** |
| * Read a log file from start to end positions. The offsets may be negative, |
| * in which case they are relative to the end of the file. For example, |
| * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind, |
| * -4197, -1) is the last 4196 bytes. |
| * |
| * @param taskid the id of the task to read the log file for |
| * @param kind the kind of log to read |
| * @param pStart the offset to read from (negative is relative to tail) |
| * @param pEnd the offset to read upto (negative is relative to tail) |
| * @throws IOException |
| */ |
| public Reader(TaskAttemptID taskid, LogName kind, long pStart, long pEnd) |
| throws IOException { |
| long start = pStart; |
| long end = pEnd; |
| // find the right log file |
| File filename = getTaskLogFile(taskid, kind); |
| // calculate the start and stop |
| long size = filename.length(); |
| if (start < 0) { |
| start += size + 1; |
| } |
| if (end < 0) { |
| end += size + 1; |
| } |
| start = Math.max(0, Math.min(start, size)); |
| end = Math.max(0, Math.min(end, size)); |
| bytesRemaining = end - start; |
| file = new FileInputStream(filename); |
| // skip upto start |
| long pos = 0; |
| while (pos < start) { |
| long result = file.skip(start - pos); |
| if (result < 0) { |
| bytesRemaining = 0; |
| break; |
| } |
| pos += result; |
| } |
| } |
| |
| @Override |
| public int read() throws IOException { |
| int result = -1; |
| if (bytesRemaining > 0) { |
| bytesRemaining -= 1; |
| result = file.read(); |
| } |
| return result; |
| } |
| |
| @Override |
| public int read(byte[] buffer, int offset, int pLength) throws IOException { |
| int length = (int) Math.min(pLength, bytesRemaining); |
| int bytes = file.read(buffer, offset, length); |
| if (bytes > 0) { |
| bytesRemaining -= bytes; |
| } |
| return bytes; |
| } |
| |
| @Override |
| public int available() throws IOException { |
| return (int) Math.min(bytesRemaining, file.available()); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| file.close(); |
| } |
| } |
| |
| private static final String bashCommand = "bash"; |
| private static final String tailCommand = "tail"; |
| |
| /** |
| * Get the desired maximum length of task's logs. |
| * |
| * @param conf the job to look in |
| * @return the number of bytes to cap the log files at |
| */ |
| public static long getTaskLogLength(HamaConfiguration conf) { |
| return conf.getLong("bsp.userlog.limit.kb", 100) * 1024; |
| } |
| |
| /** |
| * Wrap a command in a shell to capture stdout and stderr to files. If the |
| * tailLength is 0, the entire output will be saved. |
| * |
| * @param cmd The command and the arguments that should be run |
| * @param stdoutFilename The filename that stdout should be saved to |
| * @param stderrFilename The filename that stderr should be saved to |
| * @param tailLength The length of the tail to be saved. |
| * @return the modified command that should be run |
| */ |
| public static List<String> captureOutAndError(List<String> cmd, |
| File stdoutFilename, File stderrFilename, long tailLength) |
| throws IOException { |
| return captureOutAndError(null, cmd, stdoutFilename, stderrFilename, |
| tailLength); |
| } |
| |
| /** |
| * Wrap a command in a shell to capture stdout and stderr to files. Setup |
| * commands such as setting memory limit can be passed which will be executed |
| * before exec. If the tailLength is 0, the entire output will be saved. |
| * |
| * @param setup The setup commands for the execed process. |
| * @param cmd The command and the arguments that should be run |
| * @param stdoutFilename The filename that stdout should be saved to |
| * @param stderrFilename The filename that stderr should be saved to |
| * @param tailLength The length of the tail to be saved. |
| * @return the modified command that should be run |
| */ |
| public static List<String> captureOutAndError(List<String> setup, |
| List<String> cmd, File stdoutFilename, File stderrFilename, |
| long tailLength) throws IOException { |
| String stdout = FileUtil.makeShellPath(stdoutFilename); |
| String stderr = FileUtil.makeShellPath(stderrFilename); |
| List<String> result = new ArrayList<String>(3); |
| result.add(bashCommand); |
| result.add("-c"); |
| StringBuffer mergedCmd = new StringBuffer(); |
| if (setup != null && setup.size() > 0) { |
| mergedCmd.append(addCommand(setup, false)); |
| mergedCmd.append(";"); |
| } |
| if (tailLength > 0) { |
| mergedCmd.append("("); |
| } else { |
| mergedCmd.append("exec "); |
| } |
| mergedCmd.append(addCommand(cmd, true)); |
| mergedCmd.append(" < /dev/null "); |
| if (tailLength > 0) { |
| mergedCmd.append(" | "); |
| mergedCmd.append(tailCommand); |
| mergedCmd.append(" -c "); |
| mergedCmd.append(tailLength); |
| mergedCmd.append(" > "); |
| mergedCmd.append(stdout); |
| mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | "); |
| mergedCmd.append(tailCommand); |
| mergedCmd.append(" -c "); |
| mergedCmd.append(tailLength); |
| mergedCmd.append(" > "); |
| mergedCmd.append(stderr); |
| mergedCmd.append(" ; exit $PIPESTATUS"); |
| } else { |
| mergedCmd.append(" 1> "); |
| mergedCmd.append(stdout); |
| mergedCmd.append(" 2> "); |
| mergedCmd.append(stderr); |
| } |
| result.add(mergedCmd.toString()); |
| return result; |
| } |
| |
| public static List<String> captureOutAndErrorTee(List<String> setup, |
| List<String> cmd, File stdoutFilename, File stderrFilename, |
| long tailLength) throws IOException { |
| String stdout = FileUtil.makeShellPath(stdoutFilename); |
| List<String> result = new ArrayList<String>(3); |
| result.add(bashCommand); |
| result.add("-c"); |
| StringBuilder mergedCmd = new StringBuilder(); |
| |
| mergedCmd.append(addCommand(cmd, true)); |
| mergedCmd.append(" 2>&1 | tee ").append(stdout); |
| |
| result.add(mergedCmd.toString()); |
| return result; |
| } |
| |
| /** |
| * Add quotes to each of the command strings and return as a single string |
| * |
| * @param cmd The command to be quoted |
| * @param pIsExecutable makes shell path if the first argument is executable |
| * @return returns The quoted string. |
| * @throws IOException |
| */ |
| public static String addCommand(List<String> cmd, boolean pIsExecutable) |
| throws IOException { |
| boolean isExecutable = pIsExecutable; |
| StringBuffer command = new StringBuffer(); |
| for (String s : cmd) { |
| command.append('\''); |
| if (isExecutable) { |
| // the executable name needs to be expressed as a shell path for the |
| // shell to find it. |
| command.append(FileUtil.makeShellPath(new File(s))); |
| isExecutable = false; |
| } else { |
| command.append(s); |
| } |
| command.append('\''); |
| command.append(" "); |
| } |
| return command.toString(); |
| } |
| |
| /** |
| * Wrap a command in a shell to capture debug script's stdout and stderr to |
| * debugout. |
| * |
| * @param cmd The command and the arguments that should be run |
| * @param debugoutFilename The filename that stdout and stderr should be saved |
| * to. |
| * @return the modified command that should be run |
| * @throws IOException |
| */ |
| public static List<String> captureDebugOut(List<String> cmd, |
| File debugoutFilename) throws IOException { |
| String debugout = FileUtil.makeShellPath(debugoutFilename); |
| List<String> result = new ArrayList<String>(3); |
| result.add(bashCommand); |
| result.add("-c"); |
| StringBuffer mergedCmd = new StringBuffer(); |
| mergedCmd.append("exec "); |
| boolean isExecutable = true; |
| for (String s : cmd) { |
| if (isExecutable) { |
| // the executable name needs to be expressed as a shell path for the |
| // shell to find it. |
| mergedCmd.append(FileUtil.makeShellPath(new File(s))); |
| isExecutable = false; |
| } else { |
| mergedCmd.append(s); |
| } |
| mergedCmd.append(" "); |
| } |
| mergedCmd.append(" < /dev/null "); |
| mergedCmd.append(" >"); |
| mergedCmd.append(debugout); |
| mergedCmd.append(" 2>&1 "); |
| result.add(mergedCmd.toString()); |
| return result; |
| } |
| |
| /** |
| * Get the desired maximum length of task's logs. |
| * |
| * @param conf the job to look in |
| * @return the number of bytes to cap the log files at |
| */ |
| public static long getTaskLogLength(Configuration conf) { |
| return conf.getLong("bsp.userlog.limit.kb", 100) * 1024; |
| } |
| |
| } // TaskLog |