blob: ffa7b014131da1cc8aedfa4d23f6ec7dbc7b6423 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.streaming;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.net.URLDecoder;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.streaming.io.InputWriter;
import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.io.Writable;
/** A generic Reducer bridge.
* It delegates operations to an external program via stdin and stdout.
*/
public class PipeReducer extends PipeMapRed implements Reducer {
private byte[] reduceOutFieldSeparator;
private byte[] reduceInputFieldSeparator;
private int numOfReduceOutputKeyFields = 1;
private boolean skipping = false;
String getPipeCommand(JobConf job) {
String str = job.get("stream.reduce.streamprocessor");
if (str == null) {
return str;
}
try {
return URLDecoder.decode(str, "UTF-8");
} catch (UnsupportedEncodingException e) {
System.err.println("stream.reduce.streamprocessor in jobconf not found");
return null;
}
}
boolean getDoPipe() {
String argv = getPipeCommand(job_);
// Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
// init
if (doPipe_ && outThread_ == null) {
startOutputThreads(output, reporter);
}
try {
while (values.hasNext()) {
Writable val = (Writable) values.next();
numRecRead_++;
maybeLogRecord();
if (doPipe_) {
if (outerrThreadsThrowable != null) {
mapRedFinished();
throw new IOException("MROutput/MRErrThread failed:",
outerrThreadsThrowable);
}
inWriter_.writeKey(key);
inWriter_.writeValue(val);
} else {
// "identity reduce"
output.collect(key, val);
}
}
if(doPipe_ && skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
clientOut_.flush();
}
} catch (IOException io) {
// a common reason to get here is failure of the subprocess.
// Document that fact, if possible.
String extraInfo = "";
try {
int exitVal = sim.exitValue();
if (exitVal == 0) {
extraInfo = "subprocess exited successfully\n";
} else {
extraInfo = "subprocess exited with error code " + exitVal + "\n";
};
} catch (IllegalThreadStateException e) {
// hmm, but child is still running. go figure.
extraInfo = "subprocess still running\n";
};
mapRedFinished();
throw new IOException(extraInfo + getContext() + io.getMessage());
}
}
public void close() {
mapRedFinished();
}
@Override
public byte[] getInputSeparator() {
return reduceInputFieldSeparator;
}
@Override
public byte[] getFieldSeparator() {
return reduceOutFieldSeparator;
}
@Override
public int getNumOfKeyFields() {
return numOfReduceOutputKeyFields;
}
@Override
InputWriter createInputWriter() throws IOException {
return super.createInputWriter(reduceInputWriterClass_);
}
@Override
OutputReader createOutputReader() throws IOException {
return super.createOutputReader(reduceOutputReaderClass_);
}
}