blob: c01c3d819f8186fbe0b2f7db40be9bb629abe9f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.pen;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.pen.util.DisplayExamples;
import org.apache.pig.pen.util.LineageTracer;
/**
* This class is used to generate example tuples for the ILLUSTRATE purpose
*
*
*/
public class ExampleGenerator {
LogicalPlan plan;
LogicalPlan newPlan;
Map<LOLoad, DataBag> baseData = null;
PigContext pigContext;
PhysicalPlan physPlan;
PhysicalPlanResetter physPlanReseter;
private MRExecutionEngine execEngine;
private LocalMapReduceSimulator localMRRunner;
Log log = LogFactory.getLog(getClass());
private int MAX_RECORDS = 10000;
private Map<Operator, PhysicalOperator> logToPhyMap;
private Map<PhysicalOperator, Operator> poLoadToLogMap;
private Map<PhysicalOperator, Operator> poToLogMap;
private HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
private LineageTracer lineage;
private Map<Operator, DataBag> logToDataMap = null;
private Map<LOForEach, Map<LogicalRelationalOperator, DataBag>> forEachInnerLogToDataMap;
Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> forEachInnerLogToPhyMap;
Map<LOLimit, Long> oriLimitMap = null;
Map<POLoad, LogicalSchema> poLoadToSchemaMap;
public ExampleGenerator(LogicalPlan plan, PigContext hadoopPigContext) {
this.plan = plan;
// pigContext = new PigContext(ExecType.LOCAL, hadoopPigContext
// .getProperties());
pigContext = hadoopPigContext;
// pigContext.setExecType(ExecType.LOCAL);
FileLocalizer.setInitialized(false);
try {
pigContext.connect();
} catch (ExecException e) {
log.error("Error connecting to the cluster "
+ e.getLocalizedMessage());
}
execEngine = new MRExecutionEngine(pigContext);
localMRRunner = new LocalMapReduceSimulator();
poLoadToSchemaMap = new HashMap<POLoad, LogicalSchema>();
}
public LineageTracer getLineage() {
return lineage;
}
public Map<Operator, PhysicalOperator> getLogToPhyMap() {
return logToPhyMap;
}
public void setMaxRecords(int max) {
MAX_RECORDS = max;
}
public Map<Operator, DataBag> getExamples() throws IOException, InterruptedException {
if (pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false"))
throw new ExecException("ILLUSTRATE must use the new logical plan!");
pigContext.inIllustrator = true;
physPlan = compilePlan(plan);
physPlanReseter = new PhysicalPlanResetter(physPlan);
List<Operator> loads = newPlan.getSources();
List<PhysicalOperator> pRoots = physPlan.getRoots();
if (loads.size() != pRoots.size())
throw new ExecException("Logical and Physical plans have different number of roots");
logToPhyMap = execEngine.getLogToPhyMap();
forEachInnerLogToPhyMap = execEngine.getForEachInnerLogToPhyMap(plan);
poLoadToLogMap = new HashMap<PhysicalOperator, Operator>();
logToDataMap = new HashMap<Operator, DataBag>();
poToLogMap = new HashMap<PhysicalOperator, Operator>();
// set up foreach inner data map
forEachInnerLogToDataMap = new HashMap<LOForEach, Map<LogicalRelationalOperator, DataBag>>();
for (Map.Entry<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> entry : forEachInnerLogToPhyMap.entrySet()) {
Map<LogicalRelationalOperator, DataBag> innerMap = new HashMap<LogicalRelationalOperator, DataBag>();
forEachInnerLogToDataMap.put(entry.getKey(), innerMap);
}
for (Operator load : loads)
{
poLoadToLogMap.put(logToPhyMap.get(load), load);
}
boolean hasLimit = false;
for (Operator lo : logToPhyMap.keySet()) {
poToLogMap.put(logToPhyMap.get(lo), lo);
if (!hasLimit && lo instanceof LOLimit)
hasLimit = true;
}
try {
readBaseData(loads);
} catch (ExecException e) {
log.error("Error reading data. " + e.getMessage());
throw e;
} catch (FrontendException e) {
log.error("Error reading data. " + e.getMessage());
throw new RuntimeException(e);
}
Map<Operator, DataBag> derivedData = null;
// create derived data and trim base data
LineageTrimmingVisitor trimmer = new LineageTrimmingVisitor(newPlan,
baseData, this, logToPhyMap, physPlan, pigContext);
trimmer.visit();
baseData = trimmer.getBaseData();
// System.out.println(
// "Obtained the first level derived and trimmed data");
// create new derived data from trimmed basedata
derivedData = getData(physPlan);
// System.out.println(
// "Got new derived data from the trimmed base data");
// augment base data
AugmentBaseDataVisitor augment = new AugmentBaseDataVisitor(newPlan,
logToPhyMap, baseData, derivedData);
augment.visit();
this.baseData = augment.getNewBaseData();
// System.out.println("Obtained augmented base data");
// create new derived data and trim the base data after augmenting
// base data with synthetic tuples
trimmer = new LineageTrimmingVisitor(newPlan, baseData, this,
logToPhyMap, physPlan, pigContext);
trimmer.visit();
baseData = trimmer.getBaseData();
// System.out.println("Final trimming");
// create the final version of derivedData to give to the output
derivedData = getData(physPlan);
// System.out.println("Obtaining final derived data for output");
if (hasLimit)
{
augment.setLimit();
augment.visit();
this.baseData = augment.getNewBaseData();
oriLimitMap = augment.getOriLimitMap();
derivedData = getData();
}
// DisplayExamples.printSimple(plan.getLeaves().get(0),
// derivedData.derivedData);
System.out.println(DisplayExamples.printTabular(newPlan,
derivedData, forEachInnerLogToDataMap));
pigContext.inIllustrator = false;
return derivedData;
}
private void readBaseData(List<Operator> loads) throws IOException, InterruptedException, FrontendException, ExecException {
PhysicalPlan thisPhyPlan = new PhysicalPlan();
for (Operator op : loads) {
LogicalSchema schema = ((LOLoad) op).getSchema();
if(schema == null) {
throw new ExecException("Example Generator requires a schema. Please provide a schema while loading data.");
}
poLoadToSchemaMap.put((POLoad)logToPhyMap.get(op), schema);
thisPhyPlan.add(logToPhyMap.get(op));
}
baseData = null;
Map<Operator, DataBag> result = getData(thisPhyPlan);
baseData = new HashMap<LOLoad, DataBag>();
for (Operator lo : result.keySet()) {
if (lo instanceof LOLoad) {
baseData.put((LOLoad) lo, result.get(lo));
}
}
}
PhysicalPlan compilePlan(LogicalPlan plan) throws ExecException, FrontendException {
newPlan = new LogicalPlan(plan);
plan.optimize(pigContext);
PhysicalPlan result = execEngine.compile(plan, null);
return result;
}
public Map<Operator, DataBag> getData() throws IOException, InterruptedException {
return getData(physPlan);
}
private Map<Operator, DataBag> getData(PhysicalPlan plan) throws PigException, IOException, InterruptedException
{
// get data on a physical plan possibly trimmed of one branch
lineage = new LineageTracer();
IllustratorAttacher attacher = new IllustratorAttacher(plan, lineage, MAX_RECORDS, poLoadToSchemaMap, pigContext);
attacher.visit();
if (oriLimitMap != null) {
for (Map.Entry<LOLimit, Long> entry : oriLimitMap.entrySet()) {
logToPhyMap.get(entry.getKey()).getIllustrator().setOriginalLimit(entry.getValue());
}
}
getLogToDataMap(attacher.getDataMap());
if (baseData != null ) {
setLoadDataMap();
physPlanReseter.visit();
}
localMRRunner.launchPig(plan, baseData, lineage, attacher, this, pigContext);
if (baseData == null)
poToEqclassesMap = attacher.poToEqclassesMap;
else {
for (Map.Entry<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> entry : attacher.poToEqclassesMap.entrySet()) {
if(!(entry.getKey() instanceof POLoad))
poToEqclassesMap.put(entry.getKey(), entry.getValue());
}
}
if (baseData != null)
// only for non derived data generation
phyToMRTransform(plan, attacher.getDataMap());
return logToDataMap;
}
public Map<Operator, DataBag> getData(Map<LOLoad, DataBag> newBaseData) throws Exception
{
baseData = newBaseData;
return getData(physPlan);
}
private void phyToMRTransform(PhysicalPlan plan, Map<PhysicalOperator, DataBag> phyToDataMap) {
// remap the LO to PO as result of the MR compilation may have changed PO in the MR plans
Map<PhysicalOperator, PhysicalOperator> phyToMRMap = localMRRunner.getPhyToMRMap();
for (Map.Entry<PhysicalOperator, Operator> entry : poToLogMap.entrySet()) {
if (phyToMRMap.get(entry.getKey()) != null) {
PhysicalOperator poInMR = phyToMRMap.get(entry.getKey());
logToDataMap.put(entry.getValue(), phyToDataMap.get(poInMR));
poToEqclassesMap.put(entry.getKey(), poToEqclassesMap.get(poInMR));
}
}
}
private void getLogToDataMap(Map<PhysicalOperator, DataBag> phyToDataMap) {
logToDataMap.clear();
for (Operator lo : logToPhyMap.keySet()) {
if (logToPhyMap.get(lo) != null)
logToDataMap.put(lo, phyToDataMap.get(logToPhyMap.get(lo)));
}
// set the LO-to-Data mapping for the ForEach inner plans
for (Map.Entry<LOForEach, Map<LogicalRelationalOperator, DataBag>> entry : forEachInnerLogToDataMap.entrySet()) {
entry.getValue().clear();
for (Map.Entry<LogicalRelationalOperator, PhysicalOperator> innerEntry : forEachInnerLogToPhyMap.get(entry.getKey()).entrySet()) {
entry.getValue().put(innerEntry.getKey(), phyToDataMap.get(innerEntry.getValue()));
}
}
}
private void setLoadDataMap() {
// This function sets up the LO-TO-Data map, eq. class, and lineage for the base data used in the coming runner
// this must be called after logToDataMap has been properly (re)set and before the runner is started
if (baseData != null) {
if (poToEqclassesMap == null)
poToEqclassesMap = new HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>>();
else
poToEqclassesMap.clear();
for (LOLoad lo : baseData.keySet()) {
logToDataMap.get(lo).addAll(baseData.get(lo));
LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
equivalenceClasses.add(equivalenceClass);
for (Tuple t : baseData.get(lo)) {
lineage.insert(t);
equivalenceClass.add(t);
}
poToEqclassesMap.put(logToPhyMap.get(lo), equivalenceClasses);
}
}
}
public Collection<IdentityHashSet<Tuple>> getEqClasses() {
Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> logToEqclassesMap = getLoToEqClassMap();
LinkedList<IdentityHashSet<Tuple>> ret = new LinkedList<IdentityHashSet<Tuple>>();
for (Map.Entry<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> entry :
logToEqclassesMap.entrySet()) {
if (entry.getValue() != null)
ret.addAll(entry.getValue());
}
return ret;
}
public Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap() {
Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> ret =
EquivalenceClasses.getLoToEqClassMap(physPlan, newPlan, logToPhyMap, logToDataMap, forEachInnerLogToPhyMap, poToEqclassesMap);
// eq classes adjustments based upon logical operators
for (Map.Entry<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> entry :ret.entrySet())
{
if (entry.getKey() instanceof LOSort) {
Collection<IdentityHashSet<Tuple>> eqClasses = entry.getValue();
for (Iterator<IdentityHashSet<Tuple>> it = eqClasses.iterator(); it.hasNext(); ) {
Object t = null;
IdentityHashSet<Tuple> eqClass = it.next();
if (eqClass.size() == 1) {
eqClass.clear();
continue;
}
boolean first = true, allIdentical = true;
for (Iterator<Tuple> it1 = eqClass.iterator(); it1.hasNext();)
{
if (first) {
first = false;
t = it1.next();
} else {
if (!it1.next().equals(t)) {
allIdentical = false;
break;
}
}
}
if (allIdentical)
eqClass.clear();
}
}
}
return ret;
}
}