| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapred.jobcontrol.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.pig.PigConstants; |
| import org.apache.pig.PigException; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.HDataType; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; |
| import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.SchemaTupleBackend; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.io.NullablePartitionWritable; |
| import org.apache.pig.impl.io.NullableTuple; |
| import org.apache.pig.impl.io.PigNullableWritable; |
| import org.apache.pig.impl.plan.DependencyOrderWalker; |
| import org.apache.pig.impl.plan.VisitorException; |
| import org.apache.pig.impl.util.ObjectSerializer; |
| import org.apache.pig.impl.util.Pair; |
| import org.apache.pig.impl.util.SpillableMemoryManager; |
| import org.apache.pig.impl.util.UDFContext; |
| import org.apache.pig.impl.util.Utils; |
| import org.apache.pig.tools.pigstats.PigStatusReporter; |
| |
| /** |
| * This class is the static Mapper & Reducer classes that |
| * are used by Pig to execute Pig Map Reduce jobs. Since |
| * there is a reduce phase, the leaf is bound to be a |
| * POLocalRearrange. So the map phase has to separate the |
| * key and tuple and collect it into the output |
| * collector. |
| * |
| * The shuffle and sort phase sorts these keys & tuples |
| * and creates key, List<Tuple> and passes the key and |
| * iterator to the list. The deserialized POPackage operator |
| * is used to package the key, List<Tuple> into pigKey, |
| * Bag<Tuple> where pigKey is of the appropriate pig type and |
| * then the result of the package is attached to the reduce |
| * plan which is executed if its not empty. Either the result |
| * of the reduce plan or the package res is collected into |
| * the output collector. |
| * |
| * The index of the tuple (that is, which bag it should be placed in by the |
| * package) is packed into the key. This is done so that hadoop sorts the |
| * keys in order of index for join. |
| * |
| * This class is the base class for PigMapReduce, which has slightly |
| * difference among different versions of hadoop. PigMapReduce implementation |
| * is located in $PIG_HOME/shims. |
| */ |
| public class PigGenericMapReduce { |
| |
| public static JobContext sJobContext = null; |
| |
| /** |
| * @deprecated Use {@link UDFContext} instead in the following way to get |
| * the job's {@link Configuration}: |
| * <pre>UdfContext.getUdfContext().getJobConf()</pre> |
| */ |
| @Deprecated |
| public static Configuration sJobConf = null; |
| |
| public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>(); |
| |
| //@StaticDataCleanup |
| public static void staticDataCleanup() { |
| sJobContext = null; |
| sJobConf = null; |
| sJobConfInternal = new ThreadLocal<Configuration>(); |
| } |
| |
| public static class Map extends PigMapBase { |
| |
| @Override |
| public void collect(Context oc, Tuple tuple) |
| throws InterruptedException, IOException { |
| |
| Byte index = (Byte)tuple.get(0); |
| PigNullableWritable key = |
| HDataType.getWritableComparableTypes(tuple.get(1), keyType); |
| NullableTuple val = new NullableTuple((Tuple)tuple.get(2)); |
| |
| // Both the key and the value need the index. The key needs it so |
| // that it can be sorted on the index in addition to the key |
| // value. The value needs it so that POPackage can properly |
| // assign the tuple to its slot in the projection. |
| key.setIndex(index); |
| val.setIndex(index); |
| |
| oc.write(key, val); |
| } |
| } |
| |
| /** |
| * This "specialized" map class is ONLY to be used in pig queries with |
| * order by a udf. A UDF used for comparison in the order by expects |
| * to be handed tuples. Hence this map class ensures that the "key" used |
| * in the order by is wrapped into a tuple (if it isn't already a tuple) |
| */ |
| public static class MapWithComparator extends PigMapBase { |
| |
| @Override |
| public void collect(Context oc, Tuple tuple) |
| throws InterruptedException, IOException { |
| |
| Object keyTuple = null; |
| if(keyType != DataType.TUPLE) { |
| Object k = tuple.get(1); |
| keyTuple = tf.newTuple(k); |
| } else { |
| keyTuple = tuple.get(1); |
| } |
| |
| |
| Byte index = (Byte)tuple.get(0); |
| PigNullableWritable key = |
| HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE); |
| NullableTuple val = new NullableTuple((Tuple)tuple.get(2)); |
| |
| // Both the key and the value need the index. The key needs it so |
| // that it can be sorted on the index in addition to the key |
| // value. The value needs it so that POPackage can properly |
| // assign the tuple to its slot in the projection. |
| key.setIndex(index); |
| val.setIndex(index); |
| |
| oc.write(key, val); |
| } |
| } |
| |
| /** |
| * Used by Skewed Join |
| */ |
| public static class MapWithPartitionIndex extends Map { |
| |
| @Override |
| public void collect(Context oc, Tuple tuple) |
| throws InterruptedException, IOException { |
| |
| Byte tupleKeyIdx = 2; |
| Byte tupleValIdx = 3; |
| |
| Byte index = (Byte)tuple.get(0); |
| Integer partitionIndex = -1; |
| // for partitioning table, the partition index isn't present |
| if (tuple.size() == 3) { |
| //super.collect(oc, tuple); |
| //return; |
| tupleKeyIdx--; |
| tupleValIdx--; |
| } else { |
| partitionIndex = (Integer)tuple.get(1); |
| } |
| |
| PigNullableWritable key = |
| HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType); |
| |
| NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key); |
| |
| NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx)); |
| |
| // Both the key and the value need the index. The key needs it so |
| // that it can be sorted on the index in addition to the key |
| // value. The value needs it so that POPackage can properly |
| // assign the tuple to its slot in the projection. |
| wrappedKey.setIndex(index); |
| |
| // set the partition |
| wrappedKey.setPartition(partitionIndex); |
| val.setIndex(index); |
| oc.write(wrappedKey, val); |
| } |
| |
| @Override |
| protected void runPipeline(PhysicalOperator leaf) |
| throws IOException, InterruptedException { |
| |
| while(true){ |
| Result res = leaf.getNextTuple(); |
| |
| if(res.returnStatus==POStatus.STATUS_OK){ |
| // For POPartitionRearrange, the result is a bag. |
| // This operator is used for skewed join |
| if (res.result instanceof DataBag) { |
| Iterator<Tuple> its = ((DataBag)res.result).iterator(); |
| while(its.hasNext()) { |
| collect(outputCollector, its.next()); |
| } |
| }else{ |
| collect(outputCollector, (Tuple)res.result); |
| } |
| continue; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_EOP) { |
| return; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_NULL) { |
| continue; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_ERR){ |
| // remember that we had an issue so that in |
| // close() we can do the right thing |
| errorInMap = true; |
| // if there is an errmessage use it |
| String errMsg; |
| if(res.result != null) { |
| errMsg = "Received Error while " + |
| "processing the map plan: " + res.result; |
| } else { |
| errMsg = "Received Error while " + |
| "processing the map plan."; |
| } |
| |
| int errCode = 2055; |
| throw new ExecException(errMsg, errCode, PigException.BUG); |
| } |
| } |
| } |
| } |
| |
| abstract public static class Reduce |
| extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { |
| |
| protected final Log log = LogFactory.getLog(getClass()); |
| |
| //The reduce plan |
| protected PhysicalPlan rp = null; |
| |
| // Store operators |
| protected List<POStore> stores; |
| |
| //The POPackage operator which is the |
| //root of every Map Reduce plan is |
| //obtained through the job conf. The portion |
| //remaining after its removal is the reduce |
| //plan |
| protected POPackage pack; |
| |
| ProgressableReporter pigReporter; |
| |
| protected Context outputCollector; |
| |
| protected boolean errorInReduce = false; |
| |
| PhysicalOperator[] roots; |
| |
| private PhysicalOperator leaf; |
| |
| protected volatile boolean initialized = false; |
| |
| private boolean inIllustrator = false; |
| |
| /** |
| * Set the reduce plan: to be used by local runner for illustrator |
| * @param plan Reduce plan |
| */ |
| public void setReducePlan(PhysicalPlan plan) { |
| rp = plan; |
| } |
| |
| /** |
| * Configures the Reduce plan, the POPackage operator |
| * and the reporter thread |
| */ |
| @SuppressWarnings("unchecked") |
| @Override |
| protected void setup(Context context) throws IOException, InterruptedException { |
| super.setup(context); |
| inIllustrator = inIllustrator(context); |
| if (inIllustrator) |
| pack = getPack(context); |
| Configuration jConf = context.getConfiguration(); |
| SpillableMemoryManager.getInstance().configure(jConf); |
| context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); |
| sJobContext = context; |
| sJobConfInternal.set(context.getConfiguration()); |
| sJobConf = context.getConfiguration(); |
| try { |
| PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); |
| |
| // This attempts to fetch all of the generated code from the distributed cache, and resolve it |
| SchemaTupleBackend.initialize(jConf); |
| |
| if (rp == null) |
| rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf |
| .get("pig.reducePlan")); |
| stores = PlanHelper.getPhysicalOperators(rp, POStore.class); |
| |
| if (!inIllustrator) |
| pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package")); |
| // To be removed |
| if(rp.isEmpty()) |
| log.debug("Reduce Plan empty!"); |
| else{ |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| rp.explain(baos); |
| log.debug(baos.toString()); |
| } |
| pigReporter = new ProgressableReporter(); |
| if(!(rp.isEmpty())) { |
| roots = rp.getRoots().toArray(new PhysicalOperator[1]); |
| leaf = rp.getLeaves().get(0); |
| } |
| |
| // Get the UDF specific context |
| MapRedUtil.setupUDFContext(jConf); |
| |
| } catch (IOException ioe) { |
| String msg = "Problem while configuring reduce plan."; |
| throw new RuntimeException(msg, ioe); |
| } |
| |
| log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location")); |
| |
| Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get()); |
| } |
| |
| /** |
| * The reduce function which packages the key and List<Tuple> |
| * into key, Bag<Tuple> after converting Hadoop type key into Pig type. |
| * The package result is either collected as is, if the reduce plan is |
| * empty or after passing through the reduce plan. |
| */ |
| @Override |
| protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) |
| throws IOException, InterruptedException { |
| |
| if (!initialized) { |
| initialized = true; |
| |
| // cache the collector for use in runPipeline() |
| // which could additionally be called from close() |
| this.outputCollector = context; |
| pigReporter.setRep(context); |
| PhysicalOperator.setReporter(pigReporter); |
| |
| boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); |
| PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); |
| pigStatusReporter.setContext(new MRTaskContext(context)); |
| PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); |
| pigHadoopLogger.setReporter(pigStatusReporter); |
| pigHadoopLogger.setAggregate(aggregateWarning); |
| PhysicalOperator.setPigLogger(pigHadoopLogger); |
| |
| if (!inIllustrator) |
| for (POStore store: stores) { |
| MapReducePOStoreImpl impl |
| = new MapReducePOStoreImpl(context); |
| store.setStoreImpl(impl); |
| store.setUp(); |
| } |
| } |
| |
| // In the case we optimize the join, we combine |
| // POPackage and POForeach - so we could get many |
| // tuples out of the getnext() call of POJoinPackage |
| // In this case, we process till we see EOP from |
| // POJoinPacakage.getNext() |
| if (pack.getPkgr() instanceof JoinPackager) |
| { |
| pack.attachInput(key, tupIter.iterator()); |
| while (true) |
| { |
| if (processOnePackageOutput(context)) |
| break; |
| } |
| } |
| else { |
| // join is not optimized, so package will |
| // give only one tuple out for the key |
| pack.attachInput(key, tupIter.iterator()); |
| processOnePackageOutput(context); |
| } |
| } |
| |
| // return: false-more output |
| // true- end of processing |
| public boolean processOnePackageOutput(Context oc) |
| throws IOException, InterruptedException { |
| |
| Result res = pack.getNextTuple(); |
| if(res.returnStatus==POStatus.STATUS_OK){ |
| Tuple packRes = (Tuple)res.result; |
| |
| if(rp.isEmpty()){ |
| oc.write(null, packRes); |
| return false; |
| } |
| for (int i = 0; i < roots.length; i++) { |
| roots[i].attachInput(packRes); |
| } |
| runPipeline(leaf); |
| |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_NULL) { |
| return false; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_ERR){ |
| int errCode = 2093; |
| String msg = "Encountered error in package operator while processing group."; |
| throw new ExecException(msg, errCode, PigException.BUG); |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_EOP) { |
| return true; |
| } |
| |
| return false; |
| |
| } |
| |
| /** |
| * @param leaf |
| * @throws InterruptedException |
| * @throws IOException |
| */ |
| protected void runPipeline(PhysicalOperator leaf) |
| throws InterruptedException, IOException { |
| |
| while(true) |
| { |
| Result redRes = leaf.getNextTuple(); |
| if(redRes.returnStatus==POStatus.STATUS_OK){ |
| try{ |
| outputCollector.write(null, (Tuple)redRes.result); |
| }catch(Exception e) { |
| throw new IOException(e); |
| } |
| continue; |
| } |
| |
| if(redRes.returnStatus==POStatus.STATUS_EOP) { |
| return; |
| } |
| |
| if(redRes.returnStatus==POStatus.STATUS_NULL) { |
| continue; |
| } |
| |
| if(redRes.returnStatus==POStatus.STATUS_ERR){ |
| // remember that we had an issue so that in |
| // close() we can do the right thing |
| errorInReduce = true; |
| // if there is an errmessage use it |
| String msg; |
| if(redRes.result != null) { |
| msg = "Received Error while " + |
| "processing the reduce plan: " + redRes.result; |
| } else { |
| msg = "Received Error while " + |
| "processing the reduce plan."; |
| } |
| int errCode = 2090; |
| throw new ExecException(msg, errCode, PigException.BUG); |
| } |
| } |
| } |
| |
| /** |
| * Will be called once all the intermediate keys and values are |
| * processed. So right place to stop the reporter thread. |
| */ |
| @Override |
| protected void cleanup(Context context) throws IOException, InterruptedException { |
| super.cleanup(context); |
| |
| if(errorInReduce) { |
| // there was an error in reduce - just return |
| return; |
| } |
| |
| if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true") && !rp.isEmpty()) { |
| // If there is a stream in the pipeline we could |
| // potentially have more to process - so lets |
| // set the flag stating that all map input has been sent |
| // already and then lets run the pipeline one more time |
| // This will result in nothing happening in the case |
| // where there is no stream in the pipeline |
| rp.endOfAllInput = true; |
| runPipeline(leaf); |
| } |
| |
| if (!inIllustrator) { |
| for (POStore store: stores) { |
| if (!initialized) { |
| MapReducePOStoreImpl impl |
| = new MapReducePOStoreImpl(context); |
| store.setStoreImpl(impl); |
| store.setUp(); |
| } |
| store.tearDown(); |
| } |
| } |
| |
| //Calling EvalFunc.finish() |
| UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp)); |
| try { |
| finisher.visit(); |
| } catch (VisitorException e) { |
| throw new IOException("Error trying to finish UDFs",e); |
| } |
| |
| PhysicalOperator.setReporter(null); |
| initialized = false; |
| } |
| |
| /** |
| * Get reducer's illustrator context |
| * |
| * @param input Input buffer as output by maps |
| * @param pkg package |
| * @return reducer's illustrator context |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| abstract public Context getIllustratorContext(Job job, |
| List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException; |
| |
| abstract public boolean inIllustrator(Context context); |
| |
| abstract public POPackage getPack(Context context); |
| } |
| |
| /** |
| * This "specialized" reduce class is ONLY to be used in pig queries with |
| * order by a udf. A UDF used for comparison in the order by expects |
| * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator) |
| * ensures that the "key" used in the order by is wrapped into a tuple (if it |
| * isn't already a tuple). This reduce class unwraps this tuple in the case where |
| * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage |
| * for processing |
| */ |
| public static class ReduceWithComparator extends PigMapReduce.Reduce { |
| |
| private byte keyType; |
| |
| /** |
| * Configures the Reduce plan, the POPackage operator |
| * and the reporter thread |
| */ |
| @Override |
| protected void setup(Context context) throws IOException, InterruptedException { |
| super.setup(context); |
| keyType = pack.getPkgr().getKeyType(); |
| } |
| |
| /** |
| * The reduce function which packages the key and List<Tuple> |
| * into key, Bag<Tuple> after converting Hadoop type key into Pig type. |
| * The package result is either collected as is, if the reduce plan is |
| * empty or after passing through the reduce plan. |
| */ |
| @Override |
| protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) |
| throws IOException, InterruptedException { |
| |
| if (!initialized) { |
| initialized = true; |
| |
| // cache the collector for use in runPipeline() |
| // which could additionally be called from close() |
| this.outputCollector = context; |
| pigReporter.setRep(context); |
| PhysicalOperator.setReporter(pigReporter); |
| |
| boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); |
| PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); |
| pigStatusReporter.setContext(new MRTaskContext(context)); |
| PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); |
| pigHadoopLogger.setReporter(pigStatusReporter); |
| pigHadoopLogger.setAggregate(aggregateWarning); |
| PhysicalOperator.setPigLogger(pigHadoopLogger); |
| |
| for (POStore store: stores) { |
| MapReducePOStoreImpl impl |
| = new MapReducePOStoreImpl(context); |
| store.setStoreImpl(impl); |
| store.setUp(); |
| } |
| } |
| |
| // If the keyType is not a tuple, the MapWithComparator.collect() |
| // would have wrapped the key into a tuple so that the |
| // comparison UDF used in the order by can process it. |
| // We need to unwrap the key out of the tuple and hand it |
| // to the POPackage for processing |
| if(keyType != DataType.TUPLE) { |
| Tuple t = (Tuple)(key.getValueAsPigType()); |
| try { |
| key = HDataType.getWritableComparableTypes(t.get(0), keyType); |
| } catch (ExecException e) { |
| throw e; |
| } |
| } |
| |
| pack.attachInput(key, tupIter.iterator()); |
| |
| Result res = pack.getNextTuple(); |
| if(res.returnStatus==POStatus.STATUS_OK){ |
| Tuple packRes = (Tuple)res.result; |
| |
| if(rp.isEmpty()){ |
| context.write(null, packRes); |
| return; |
| } |
| |
| rp.attachInput(packRes); |
| |
| List<PhysicalOperator> leaves = rp.getLeaves(); |
| |
| PhysicalOperator leaf = leaves.get(0); |
| runPipeline(leaf); |
| |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_NULL) { |
| return; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_ERR){ |
| int errCode = 2093; |
| String msg = "Encountered error in package operator while processing group."; |
| throw new ExecException(msg, errCode, PigException.BUG); |
| } |
| |
| } |
| |
| } |
| |
| } |