blob: 555c2c59b2a4a7e1ee75b08f69ee21d7b8db3ef9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package testjar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import java.util.Random;
import java.io.IOException;
import java.io.DataOutputStream;
import java.io.File;
/**
* It uses for defining a various types of mapperes with child processes.
*/
public class GenerateTaskChildProcess {
private static final Log LOG = LogFactory
.getLog(GenerateTaskChildProcess.class);
/**
* It uses for defining the string appending mapper with
* child processes.It's keep appending the string and increases
* the memory continuously.
*/
public static class StrAppendMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, NullWritable> {
private JobConf conf;
public void configure(JobConf conf) {
this.conf = conf;
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, NullWritable> out,
Reporter reporter) throws IOException {
int counter = 0;
while (counter < 30) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000);
}
} catch (InterruptedException iexp) {
iexp.printStackTrace();
LOG.warn("Interrupted while the map was waiting.");
}
counter ++;
}
try {
createChildProcess(conf, "AppendStr");
} catch (Exception exp) {
exp.printStackTrace();
LOG.warn("Exception thrown while creating the child processes");
}
}
public void close() {
}
}
/**
* It uses for defining the String display mapper with child processes.
*/
public static class StrDisplayMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, NullWritable> {
private JobConf conf;
public void configure(JobConf conf) {
this.conf = conf;
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, NullWritable> out,
Reporter reporter) throws IOException {
int counter = 0;
while (counter < 30) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000);
}
} catch (InterruptedException iexp) {
iexp.printStackTrace();
LOG.warn("Interrupted while the map was waiting.");
break;
}
counter ++;
}
try {
createChildProcess(conf, "DispStr");
} catch (Exception exp) {
exp.printStackTrace();
LOG.warn("Exception thrown while creating the child processes.");
}
}
}
/**
* It uses for defining a failed mapper with child processes.
*
*/
public static class FailedMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, NullWritable> {
private JobConf conf;
public void configure(JobConf conf) {
try {
createChildProcess(conf, "failedmapper");
} catch (Exception exp) {
exp.printStackTrace();
LOG.warn("Exception throw while creating the child processes");
}
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, NullWritable> out,
Reporter reporter) throws IOException {
int counter = 0;
while (counter < 30) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000);
}
} catch (InterruptedException iexp) {
iexp.printStackTrace();
LOG.warn("Interrupted while the map was waiting.");
break;
}
counter ++;
}
throw new RuntimeException("Mapper failed.");
}
}
/**
* It uses for failing the map tasks.
*
*/
public static class FailMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, NullWritable> {
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, NullWritable> out,
Reporter reporter) throws IOException {
throw new RuntimeException("failing the map");
}
}
/**
* It uses for creating the child processes for a task.
* @param conf configuration for a job.
* @param jobName the name of the mapper job.
* @throws IOException if an I/O error occurs.
*/
private static void createChildProcess(JobConf conf, String jobName)
throws IOException {
FileSystem fs = FileSystem.getLocal(conf);
File TMP_ROOT_DIR = new File("/tmp");
String TEST_ROOT_DIR = TMP_ROOT_DIR.getAbsolutePath()
+ Path.SEPARATOR + "ChildProc_" + jobName;
Path scriptDir = new Path(TEST_ROOT_DIR);
int numOfChildProcesses = 2;
if (fs.exists(scriptDir)) {
fs.delete(scriptDir, true);
}
fs.mkdirs(scriptDir);
fs.setPermission(scriptDir, new FsPermission(FsAction.ALL,
FsAction.ALL, FsAction.ALL));
String scriptDirName = scriptDir.toUri().getPath();
Random rm = new Random();
String scriptName = "ShellScript_" + jobName + "_"
+ rm.nextInt() + ".sh";
Path scriptPath = new Path(scriptDirName, scriptName);
String shellScript = scriptPath.toString();
String script = null;
if (jobName.equals("AppendStr")) {
script = "#!/bin/sh\n"
+ "umask 000\n"
+ "StrVal=\"Hadoop is framework for data intensive "
+ "distributed applications.\"\n"
+ "StrVal=\"${StrVal}Hadoop enables applications to work "
+ "with thousands of nodes.\"\n"
+ "echo $StrVal\n"
+ "if [ \"X$1\" != \"X0\" ]\nthen\n"
+ " sh " + shellScript + " $(($1-1))\n"
+ "else\n"
+ " while(true)\n"
+ " do\n"
+ " StrVal=\"$StrVal Hadoop \"\n"
+ " done\n"
+ "fi";
} else if (jobName.equals("DispStr")) {
script = "#!/bin/sh\n"
+ "umask 000\n"
+ "msg=Welcome\n"
+ "echo $msg\n"
+ " if [ \"X$1\" != \"X0\" ]\nthen\n"
+ " sh " + shellScript + " $(($1-1))\n"
+ "else\n"
+ " while(true)\n"
+ " do\n"
+ " sleep 2 \n"
+ " done\n"
+ "fi";
}else {
script = "#!/bin/sh\n"
+ "umask 000\n"
+ "msg=Welcome\n"
+ "echo $msg\n"
+ " if [ \"X$1\" != \"X0\" ]\nthen\n"
+ " sh " + shellScript + " $(($1-1))\n"
+ "else\n"
+ " for count in {1..1000}\n"
+ " do\n"
+ " echo \"$msg_$count\" \n"
+ " done\n"
+ "fi";
}
DataOutputStream file = fs.create(scriptPath);
file.writeBytes(script);
file.close();
File scriptFile = new File(scriptDirName,scriptName);
scriptFile.setExecutable(true);
LOG.info("script absolute path:" + scriptFile.getAbsolutePath());
String [] cmd = new String[]{scriptFile.getAbsolutePath(),
String.valueOf(numOfChildProcesses)};
ShellCommandExecutor shellExec = new ShellCommandExecutor(cmd);
shellExec.execute();
}
}