blob: 1611a8fd0f3f59e2fc58c72410ea2a07069ab2f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
public class PigCombiner {
public static class Combine
extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
private static final Log log = LogFactory.getLog(Combine.class);
//HADOOP-3226 Combiners can be called multiple times in both map and reduce
private static boolean firstTime = true;
private byte keyType;
//The reduce plan
private PhysicalPlan cp;
//The POPackage operator which is the
//root of every Map Reduce plan is
//obtained through the job conf. The portion
//remaining after its removal is the reduce
//plan
private POPackage pack;
ProgressableReporter pigReporter;
PhysicalOperator[] roots;
PhysicalOperator leaf;
private volatile boolean initialized = false;
//@StaticDataCleanup
public static void staticDataCleanup() {
firstTime = true;
}
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
*/
@SuppressWarnings("unchecked")
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration jConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
Properties log4jProperties = (Properties) ObjectSerializer
.deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
if (log4jProperties != null) {
PropertyConfigurator.configure(log4jProperties);
}
UDFContext.getUDFContext().reset();
MapRedUtil.setupUDFContext(context.getConfiguration());
cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
.get("pig.combinePlan"));
pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.combine.package"));
// To be removed
if(cp.isEmpty())
log.debug("Combine Plan empty!");
else{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
cp.explain(baos);
log.debug(baos.toString());
}
keyType = ((byte[])ObjectSerializer.deserialize(jConf.get("pig.map.keytype")))[0];
// till here
pigReporter = new ProgressableReporter();
if(!(cp.isEmpty())) {
roots = cp.getRoots().toArray(new PhysicalOperator[1]);
leaf = cp.getLeaves().get(0);
}
} catch (IOException ioe) {
String msg = "Problem while configuring combiner's reduce plan.";
throw new RuntimeException(msg, ioe);
}
// Avoid log spamming
if (firstTime) {
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
firstTime = false;
}
}
/**
* The reduce function which packages the key and List &lt;Tuple&gt;
* into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
* The package result is either collected as is, if the reduce plan is
* empty or after passing through the reduce plan.
*/
@Override
protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
throws IOException, InterruptedException {
if(!initialized) {
initialized = true;
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(pigStatusReporter);
pigHadoopLogger.setAggregate(aggregateWarning);
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
// In the case we optimize, we combine
// POPackage and POForeach - so we could get many
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
{
if (processOnePackageOutput(context))
break;
}
}
else {
// not optimized, so package will
// give only one tuple out for the key
pack.attachInput(key, tupIter.iterator());
processOnePackageOutput(context);
}
}
// return: false-more output
// true- end of processing
public boolean processOnePackageOutput(Context oc) throws IOException, InterruptedException {
try {
Result res = pack.getNextTuple();
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
if(cp.isEmpty()){
oc.write(null, packRes);
return false;
}
for (int i = 0; i < roots.length; i++) {
roots[i].attachInput(packRes);
}
while(true){
Result redRes = leaf.getNextTuple();
if(redRes.returnStatus==POStatus.STATUS_OK){
Tuple tuple = (Tuple)redRes.result;
Byte index = (Byte)tuple.get(0);
PigNullableWritable outKey =
HDataType.getWritableComparableTypes(tuple.get(1), this.keyType);
NullableTuple val =
new NullableTuple((Tuple)tuple.get(2));
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
outKey.setIndex(index);
val.setIndex(index);
oc.write(outKey, val);
continue;
}
if(redRes.returnStatus==POStatus.STATUS_EOP) {
break;
}
if(redRes.returnStatus==POStatus.STATUS_NULL) {
continue;
}
if(redRes.returnStatus==POStatus.STATUS_ERR){
int errCode = 2090;
String msg = "Received Error while " +
"processing the combine plan.";
if(redRes.result != null) {
msg += redRes.result;
}
throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
if(res.returnStatus==POStatus.STATUS_NULL) {
return false;
}
if(res.returnStatus==POStatus.STATUS_ERR){
int errCode = 2091;
String msg = "Packaging error while processing group.";
throw new ExecException(msg, errCode, PigException.BUG);
}
if(res.returnStatus==POStatus.STATUS_EOP) {
return true;
}
return false;
} catch (ExecException e) {
throw e;
}
}
/**
* Will be called once all the intermediate keys and values are
* processed.
* cleanup references to the PhysicalPlan
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
leaf = null;
pack = null;
pigReporter = null;
// Avoid OOM in Tez.
PhysicalOperator.setReporter(null);
roots = null;
cp = null;
}
/**
* @return the keyType
*/
public byte getKeyType() {
return keyType;
}
/**
* @param keyType the keyType to set
*/
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
}
}