| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Properties; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.log4j.PropertyConfigurator; |
| import org.apache.pig.PigConstants; |
| import org.apache.pig.PigException; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; |
| import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.SchemaTupleBackend; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.PigImplConstants; |
| import org.apache.pig.impl.io.PigNullableWritable; |
| import org.apache.pig.impl.plan.DependencyOrderWalker; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.impl.plan.VisitorException; |
| import org.apache.pig.impl.util.ObjectSerializer; |
| import org.apache.pig.impl.util.Pair; |
| import org.apache.pig.impl.util.SpillableMemoryManager; |
| import org.apache.pig.impl.util.Utils; |
| import org.apache.pig.tools.pigstats.PigStatusReporter; |
| |
| /** |
| * This class is the base class for PigMapBase, which has slightly |
| * difference among different versions of hadoop. PigMapBase implementation |
| * is located in $PIG_HOME/shims. |
| **/ |
| public abstract class PigGenericMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> { |
| |
| private final Log log = LogFactory.getLog(getClass()); |
| |
| protected byte keyType; |
| |
| //Map Plan |
| protected PhysicalPlan mp = null; |
| |
| // Store operators |
| protected List<POStore> stores; |
| |
| protected TupleFactory tf = TupleFactory.getInstance(); |
| |
| boolean inIllustrator = false; |
| |
| Context outputCollector; |
| |
| // Reporter that will be used by operators |
| // to transmit heartbeat |
| ProgressableReporter pigReporter; |
| |
| protected boolean errorInMap = false; |
| |
| PhysicalOperator[] roots; |
| |
| private PhysicalOperator leaf; |
| |
| private volatile boolean initialized = false; |
| |
| /** |
| * for local map/reduce simulation |
| * @param plan the map plan |
| */ |
| public void setMapPlan(PhysicalPlan plan) { |
| mp = plan; |
| } |
| |
| /** |
| * Will be called when all the tuples in the input |
| * are done. So reporter thread should be closed. |
| */ |
| @Override |
| public void cleanup(Context context) throws IOException, InterruptedException { |
| super.cleanup(context); |
| if(errorInMap) { |
| //error in map - returning |
| return; |
| } |
| |
| if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true") && !mp.isEmpty()) { |
| // If there is a stream in the pipeline or if this map job belongs to merge-join we could |
| // potentially have more to process - so lets |
| // set the flag stating that all map input has been sent |
| // already and then lets run the pipeline one more time |
| // This will result in nothing happening in the case |
| // where there is no stream or it is not a merge-join in the pipeline |
| mp.endOfAllInput = true; |
| runPipeline(leaf); |
| } |
| |
| if (!inIllustrator) { |
| for (POStore store: stores) { |
| if (!initialized) { |
| MapReducePOStoreImpl impl |
| = new MapReducePOStoreImpl(context); |
| store.setStoreImpl(impl); |
| store.setUp(); |
| } |
| store.tearDown(); |
| } |
| } |
| |
| //Calling EvalFunc.finish() |
| UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp)); |
| try { |
| finisher.visit(); |
| } catch (VisitorException e) { |
| int errCode = 2121; |
| String msg = "Error while calling finish method on UDFs."; |
| throw new VisitorException(msg, errCode, PigException.BUG, e); |
| } |
| |
| mp = null; |
| |
| PhysicalOperator.setReporter(null); |
| initialized = false; |
| } |
| |
| /** |
| * Configures the mapper with the map plan and the |
| * reproter thread |
| */ |
| @SuppressWarnings("unchecked") |
| @Override |
| public void setup(Context context) throws IOException, InterruptedException { |
| super.setup(context); |
| |
| Configuration job = context.getConfiguration(); |
| SpillableMemoryManager.getInstance().configure(job); |
| context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); |
| PigMapReduce.sJobContext = context; |
| PigMapReduce.sJobConfInternal.set(context.getConfiguration()); |
| PigMapReduce.sJobConf = context.getConfiguration(); |
| inIllustrator = inIllustrator(context); |
| |
| PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list"))); |
| |
| // This attempts to fetch all of the generated code from the distributed cache, and resolve it |
| SchemaTupleBackend.initialize(job); |
| |
| Properties log4jProperties = (Properties) ObjectSerializer |
| .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); |
| if (log4jProperties != null) { |
| PropertyConfigurator.configure(log4jProperties); |
| } |
| |
| if (mp == null) |
| mp = (PhysicalPlan) ObjectSerializer.deserialize( |
| job.get("pig.mapPlan")); |
| stores = PlanHelper.getPhysicalOperators(mp, POStore.class); |
| |
| // To be removed |
| if(mp.isEmpty()) |
| log.debug("Map Plan empty!"); |
| else{ |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| mp.explain(baos); |
| log.debug(baos.toString()); |
| } |
| keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0]; |
| // till here |
| |
| pigReporter = new ProgressableReporter(); |
| // Get the UDF specific context |
| MapRedUtil.setupUDFContext(job); |
| |
| if(!(mp.isEmpty())) { |
| |
| PigSplit split = (PigSplit)context.getInputSplit(); |
| List<OperatorKey> targetOpKeys = split.getTargetOps(); |
| |
| ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>(); |
| for (OperatorKey targetKey : targetOpKeys) { |
| targetOpsAsList.add(mp.getOperator(targetKey)); |
| } |
| roots = targetOpsAsList.toArray(new PhysicalOperator[1]); |
| leaf = mp.getLeaves().get(0); |
| } |
| |
| PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); |
| pigStatusReporter.setContext(new MRTaskContext(context)); |
| |
| log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location")); |
| |
| Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get()); |
| } |
| |
| /** |
| * The map function that attaches the inpTuple appropriately |
| * and executes the map plan if its not empty. Collects the |
| * result of execution into oc or the input directly to oc |
| * if map plan empty. The collection is left abstract for the |
| * map-only or map-reduce job to implement. Map-only collects |
| * the tuple as-is whereas map-reduce collects it after extracting |
| * the key and indexed tuple. |
| */ |
| @Override |
| protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException { |
| if(!initialized) { |
| initialized = true; |
| // cache the collector for use in runPipeline() which |
| // can be called from close() |
| this.outputCollector = context; |
| pigReporter.setRep(context); |
| PhysicalOperator.setReporter(pigReporter); |
| |
| boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); |
| PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); |
| pigStatusReporter.setContext(new MRTaskContext(context)); |
| PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); |
| pigHadoopLogger.setReporter(pigStatusReporter); |
| pigHadoopLogger.setAggregate(aggregateWarning); |
| PhysicalOperator.setPigLogger(pigHadoopLogger); |
| |
| if (!inIllustrator) { |
| for (POStore store: stores) { |
| MapReducePOStoreImpl impl |
| = new MapReducePOStoreImpl(context); |
| store.setStoreImpl(impl); |
| store.setUp(); |
| } |
| } |
| } |
| |
| if (mp.isEmpty()) { |
| collect(context,inpTuple); |
| return; |
| } |
| |
| for (PhysicalOperator root : roots) { |
| if (inIllustrator) { |
| if (root != null) { |
| root.attachInput(inpTuple); |
| } |
| } else { |
| root.attachInput(tf.newTupleNoCopy(inpTuple.getAll())); |
| } |
| } |
| |
| runPipeline(leaf); |
| } |
| |
| protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException { |
| while(true){ |
| Result res = leaf.getNextTuple(); |
| if(res.returnStatus==POStatus.STATUS_OK){ |
| collect(outputCollector,(Tuple)res.result); |
| continue; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_EOP) { |
| return; |
| } |
| |
| if(res.returnStatus==POStatus.STATUS_NULL) |
| continue; |
| |
| if(res.returnStatus==POStatus.STATUS_ERR){ |
| // remember that we had an issue so that in |
| // close() we can do the right thing |
| errorInMap = true; |
| // if there is an errmessage use it |
| String errMsg; |
| if(res.result != null) { |
| errMsg = "Received Error while " + |
| "processing the map plan: " + res.result; |
| } else { |
| errMsg = "Received Error while " + |
| "processing the map plan."; |
| } |
| |
| int errCode = 2055; |
| ExecException ee = new ExecException(errMsg, errCode, PigException.BUG); |
| throw ee; |
| } |
| } |
| |
| } |
| |
| abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException; |
| |
| abstract public boolean inIllustrator(Context context); |
| |
| /** |
| * @return the keyType |
| */ |
| public byte getKeyType() { |
| return keyType; |
| } |
| |
| /** |
| * @param keyType the keyType to set |
| */ |
| public void setKeyType(byte keyType) { |
| this.keyType = keyType; |
| } |
| |
| abstract public Context getIllustratorContext(Configuration conf, DataBag input, |
| List<Pair<PigNullableWritable, Writable>> output, InputSplit split) |
| throws IOException, InterruptedException; |
| } |