blob: cf22cf39ec68f3f358c9d1a202f25c871ec71084 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.streaming;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.TestMiniMRWithDFS;
import org.apache.hadoop.util.Shell;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* This tests the environment set by TT for the child of task jvm.
* This will launch a streaming job with a shell script as mapper.
*/
public class TestStreamingTaskLog {
String input = "the dummy input";
Path inputPath = new Path("inDir");
Path outputPath = new Path("outDir");
String map = null;
MiniMRCluster mr = null;
FileSystem fs = null;
final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize
String[] genArgs() {
return new String[] {
"-input", inputPath.toString(),
"-output", outputPath.toString(),
"-mapper", map,
"-reducer", StreamJob.REDUCE_NONE,
"-jobconf", "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(),
"-jobconf", "fs.default.name=" + fs.getUri().toString(),
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "keep.failed.task.files=true",
"-jobconf", "mapreduce.task.userlog.limit.kb=" + USERLOG_LIMIT_KB,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
}
/**
* This test validates the setting of HADOOP_ROOT_LOGGER to 'INFO,TLA' and the
* dependent properties
* (a) hadoop.tasklog.taskid and
* (b) hadoop.tasklog.totalLogFileSize
* for the children of java tasks in streaming jobs.
*/
@Test
public void testStreamingTaskLogWithHadoopCmd() {
try {
final int numSlaves = 1;
Configuration conf = new Configuration();
fs = FileSystem.getLocal(conf);
Path testDir = new Path(System.getProperty("test.build.data","/tmp"));
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
fs.mkdirs(testDir);
File scriptFile = createScript(
testDir.toString() + "/testTaskLog.sh");
mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
writeInputFile(fs, inputPath);
map = scriptFile.getAbsolutePath();
runStreamJobAndValidateEnv();
fs.delete(outputPath, true);
assertFalse("output not cleaned up", fs.exists(outputPath));
mr.waitUntilIdle();
} catch(IOException e) {
fail(e.toString());
} finally {
if (mr != null) {
mr.shutdown();
}
}
}
private File createScript(String script) throws IOException {
File scriptFile = new File(script);
UtilTest.recursiveDelete(scriptFile);
FileOutputStream in = new FileOutputStream(scriptFile);
in.write(("cat > /dev/null 2>&1\n" +
"echo $HADOOP_ROOT_LOGGER $HADOOP_CLIENT_OPTS").getBytes());
in.close();
Shell.execCommand(new String[]{"chmod", "+x",
scriptFile.getAbsolutePath()});
return scriptFile;
}
private void writeInputFile(FileSystem fs, Path dir) throws IOException {
DataOutputStream out = fs.create(new Path(dir, "part0"));
out.writeBytes(input);
out.close();
}
/**
* Runs the streaming job and validates the output.
* @throws IOException
*/
private void runStreamJobAndValidateEnv() throws IOException {
int returnStatus = -1;
boolean mayExit = false;
StreamJob job = new StreamJob(genArgs(), mayExit);
returnStatus = job.go();
assertEquals("StreamJob failed.", 0, returnStatus);
// validate environment variables set for the child(script) of java process
String env = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf());
long logSize = USERLOG_LIMIT_KB * 1024;
assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
&& env.contains("-Dhadoop.tasklog.taskid=attempt_")
&& env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
}
}