blob: 05d61da55bfd359de2347c70bc068e5c44ad3443 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.streaming;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputLogFilter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
public class TestStreamingBadRecords extends ClusterMapReduceTestCase
{
private static final Log LOG =
LogFactory.getLog(TestStreamingBadRecords.class);
private static final List<String> MAPPER_BAD_RECORDS =
Arrays.asList("hey022","hey023","hey099");
private static final List<String> REDUCER_BAD_RECORDS =
Arrays.asList("hey001","hey018");
private static final String badMapper =
StreamUtil.makeJavaCommand(BadApp.class, new String[]{});
private static final String badReducer =
StreamUtil.makeJavaCommand(BadApp.class, new String[]{"true"});
private static final int INPUTSIZE=100;
public TestStreamingBadRecords() throws IOException
{
UtilTest utilTest = new UtilTest(getClass().getName());
utilTest.checkUserDir();
utilTest.redirectIfAntJunit();
}
protected void setUp() throws Exception {
Properties props = new Properties();
props.setProperty(JTConfig.JT_RETIREJOBS, "false");
startCluster(true, props);
}
private void createInput() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
//increasing the record size so that we have stream flushing
String prefix = new String(new byte[20*1024]);
for(int i=1;i<=INPUTSIZE;i++) {
String str = ""+i;
int zerosToPrepend = 3 - str.length();
for(int j=0;j<zerosToPrepend;j++){
str = "0"+str;
}
wr.write(prefix + "hey"+str+"\n");
}wr.close();
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
public void testSkip() throws Exception {
JobConf clusterConf = createJobConf();
createInput();
int attSkip =0;
SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
"-mapper", badMapper,
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
"-jobconf", "mapreduce.task.skip.start.attempts="+attSkip,
"-jobconf", "mapreduce.job.skip.outdir=none",
"-jobconf", "mapreduce.map.maxattempts="+mapperAttempts,
"-jobconf", "mapreduce.reduce.maxattempts="+reducerAttempts,
"-jobconf", "mapreduce.map.skip.maxrecords="+Long.MAX_VALUE,
"-jobconf", "mapreduce.reduce.skip.maxgroups="+Long.MAX_VALUE,
"-jobconf", "mapreduce.job.maps=1",
"-jobconf", "mapreduce.job.reduces=1",
"-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
"-jobconf", "mapreduce.jobtracker.address=" +
clusterConf.get(JTConfig.JT_IPC_ADDRESS),
"-jobconf", "mapreduce.jobtracker.http.address="
+clusterConf.get(JTConfig.JT_HTTP_ADDRESS),
"-jobconf", "stream.debug=set",
"-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
StreamJob job = new StreamJob(args, false);
job.go();
validateOutput(job.running_, false);
//validate that there is no skip directory as it has been set to "none"
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
}
public void testNarrowDown() throws Exception {
createInput();
JobConf clusterConf = createJobConf();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
"-mapper", badMapper,
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
"-jobconf", "mapreduce.task.skip.start.attempts=1",
//actually fewer attempts are required than specified
//but to cater to the case of slow processed counter update, need to
//have more attempts
"-jobconf", "mapreduce.map.maxattempts=20",
"-jobconf", "mapreduce.reduce.maxattempts=15",
"-jobconf", "mapreduce.map.skip.maxrecords=1",
"-jobconf", "mapreduce.reduce.skip.maxgroups=1",
"-jobconf", "mapreduce.job.maps=1",
"-jobconf", "mapreduce.job.reduces=1",
"-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
"-jobconf", "mapreduce.jobtracker.address="+clusterConf.get(JTConfig.JT_IPC_ADDRESS),
"-jobconf", "mapreduce.jobtracker.http.address="
+clusterConf.get(JTConfig.JT_HTTP_ADDRESS),
"-jobconf", "stream.debug=set",
"-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
StreamJob job = new StreamJob(args, false);
job.go();
validateOutput(job.running_, true);
assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
}
static class App{
boolean isReducer;
public App(String[] args) throws Exception{
if(args.length>0) {
isReducer = Boolean.parseBoolean(args[0]);
}
String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
if(isReducer) {
counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
processLine(line);
count++;
if(count>=10) {
System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
","+counter+","+count);
count = 0;
}
}
}
protected void processLine(String line) throws Exception{
System.out.println(line);
}
public static void main(String[] args) throws Exception{
new App(args);
}
}
static class BadApp extends App{
public BadApp(String[] args) throws Exception {
super(args);
}
protected void processLine(String line) throws Exception {
List<String> badRecords = MAPPER_BAD_RECORDS;
if(isReducer) {
badRecords = REDUCER_BAD_RECORDS;
}
if(badRecords.size()>0 && line.contains(badRecords.get(0))) {
LOG.warn("Encountered BAD record");
System.exit(-1);
}
else if(badRecords.size()>1 && line.contains(badRecords.get(1))) {
LOG.warn("Encountered BAD record");
throw new Exception("Got bad record..crashing");
}
else if(badRecords.size()>2 && line.contains(badRecords.get(2))) {
LOG.warn("Encountered BAD record");
System.exit(-1);
}
super.processLine(line);
}
public static void main(String[] args) throws Exception{
new BadApp(args);
}
}
}