blob: ef19a33227d9bb1bf01034b8d1c9c00f22d90874 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.pen;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduceCounter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.ReadScalars;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.pen.util.LineageTracer;
/**
* Main class that launches pig for Map Reduce
*
*/
public class LocalMapReduceSimulator {
private MapReduceLauncher launcher = new MapReduceLauncher();
private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
@SuppressWarnings("unchecked")
public void launchPig(PhysicalPlan php, Map<LOLoad, DataBag> baseData,
LineageTracer lineage,
IllustratorAttacher attacher,
ExampleGenerator eg,
PigContext pc) throws PigException, IOException, InterruptedException {
phyToMRMap.clear();
MROperPlan mrp = launcher.compile(php, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jc;
int numMRJobsCompl = 0;
DataBag input;
List<Pair<PigNullableWritable, Writable>> intermediateData = new ArrayList<Pair<PigNullableWritable, Writable>>();
Map<Job, MapReduceOper> jobToMroMap = jcc.getJobMroMap();
HashMap<String, DataBag> output = new HashMap<String, DataBag>();
Configuration jobConf;
// jc is null only when mrp.size == 0
boolean needFileInput;
final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
pc.getProperties().setProperty("pig.illustrating", "true");
String jtIdentifier = "" + System.currentTimeMillis();
int jobId = 0;
while(mrp.size() != 0) {
jc = jcc.compile(mrp, "Illustrator");
if(jc == null) {
throw new ExecException("Native execution is not supported");
}
List<Job> jobs = jc.getWaitingJobs();
for (Job job : jobs) {
jobId++;
jobConf = job.getJobConf();
FileLocalizer.setInitialized(false);
ArrayList<ArrayList<OperatorKey>> inpTargets =
(ArrayList<ArrayList<OperatorKey>>)
ObjectSerializer.deserialize(jobConf.get("pig.inpTargets"));
intermediateData.clear();
MapReduceOper mro = jobToMroMap.get(job);
PigSplit split = null;
List<POStore> stores = null;
PhysicalOperator pack = null;
// revisit as there are new physical operators from MR compilation
if (!mro.mapPlan.isEmpty())
attacher.revisit(mro.mapPlan);
if (!mro.reducePlan.isEmpty()) {
attacher.revisit(mro.reducePlan);
pack = mro.reducePlan.getRoots().get(0);
}
List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
if (!mro.mapPlan.isEmpty()) {
stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
}
if (!mro.reducePlan.isEmpty()) {
if (stores == null)
stores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
else
stores.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class));
}
for (POStore store : stores) {
output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
}
OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
oa.visit();
if (!mro.reducePlan.isEmpty()) {
oa = new OutputAttacher(mro.reducePlan, output);
oa.visit();
}
int index = 0;
for (POLoad ld : lds) {
input = output.get(ld.getLFile().getFileName());
if (input == null && baseData != null) {
for (LogicalRelationalOperator lo : baseData.keySet()) {
if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
{
input = baseData.get(lo);
break;
}
}
}
if (input != null)
mro.mapPlan.remove(ld);
}
int mapTaskId = 0;
for (POLoad ld : lds) {
// check newly generated data first
input = output.get(ld.getLFile().getFileName());
if (input == null && baseData != null) {
if (input == null && baseData != null) {
for (LogicalRelationalOperator lo : baseData.keySet()) {
if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
{
input = baseData.get(lo);
break;
}
}
}
}
needFileInput = (input == null);
split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
++index;
Mapper<Text, Tuple, PigNullableWritable, Writable> map;
if (mro.reducePlan.isEmpty()) {
// map-only
map = new PigMapOnly.Map();
Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapOnly.Map) map)
.getIllustratorContext(jobConf, input, intermediateData, split);
if(mro.isCounterOperation()) {
if(mro.isRowNumber()) {
map = new PigMapReduceCounter.PigMapCounter();
}
context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
}
((PigMapBase) map).setMapPlan(mro.mapPlan);
context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
map.run(context);
} else {
if ("true".equals(jobConf.get("pig.usercomparator")))
map = new PigMapReduce.MapWithComparator();
else if (!"".equals(jobConf.get("pig.keyDistFile", "")))
map = new PigMapReduce.MapWithPartitionIndex();
else
map = new PigMapReduce.Map();
Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
.getIllustratorContext(jobConf, input, intermediateData, split);
((PigMapBase) map).setMapPlan(mro.mapPlan);
context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
map.run(context);
}
}
if (!mro.reducePlan.isEmpty())
{
if (pack instanceof POPackage)
mro.reducePlan.remove(pack);
// reducer run
PigMapReduce.Reduce reduce;
if ("true".equals(jobConf.get("pig.usercomparator")))
reduce = new PigMapReduce.ReduceWithComparator();
else
reduce = new PigMapReduce.Reduce();
Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable>.Context
context = reduce.getIllustratorContext(job, intermediateData, (POPackage) pack);
if(mro.isCounterOperation()) {
reduce = new PigMapReduceCounter.PigReduceCounter();
context = ((PigMapReduceCounter.PigReduceCounter)reduce).getIllustratorContext(job, intermediateData, (POPackage) pack);
}
((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
reduce.run(context);
}
for (PhysicalOperator key : mro.phyToMRMap.keySet())
for (PhysicalOperator value : mro.phyToMRMap.get(key))
phyToMRMap.put(key, value);
}
int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
numMRJobsCompl += removedMROp;
}
jcc.reset();
}
private class OutputAttacher extends PhyPlanVisitor {
private Map<String, DataBag> outputBuffer;
OutputAttacher(PhysicalPlan plan, Map<String, DataBag> output) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
plan));
this.outputBuffer = output;
}
@Override
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
((ReadScalars) userFunc.getFunc()).setOutputBuffer(outputBuffer);
}
}
}
public Map<PhysicalOperator, PhysicalOperator> getPhyToMRMap() {
return phyToMRMap;
}
}