blob: f71d570725a79b9ddf207d4dc9159e6647bc3128 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.pen;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.eval.FilterSpec;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOEval;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.LineageTracer;
public class ShapeLineage {
static Map<LOLoad, DataBag> TrimLineages(LogicalOperator root, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, LineageTracer lineage, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) {
Map <LOLoad, DataBag> modifiedBaseData = new HashMap<LOLoad, DataBag>();
//modifiedBaseData.putAll(baseData);
List<IdentityHashSet<Tuple>> affinityGroups = new LinkedList<IdentityHashSet<Tuple>>();
IdentityHashSet<Tuple> a1 = new IdentityHashSet<Tuple>();
affinityGroups.add(a1);
IdentityHashSet<Tuple> a2 = new IdentityHashSet<Tuple>();
affinityGroups.add(a2);
TrimLineages(root, root, modifiedBaseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, 0.0, logicalToPhysicalKeys, physicalOpTable);
return modifiedBaseData;
}
static void TrimLineages(LogicalOperator root, LogicalOperator currentOp, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData, List<IdentityHashSet<Tuple>> affinityGroups, LineageTracer lineage, Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses, double completeness, Map<OperatorKey, OperatorKey> logicalToPhysicalKeys, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) {
//With lineage added in the lineageTracer to track the parent-child relationship, only filter needs a consideration since we need a record that doesn't satisfy the filter condition
IdentityHashSet<Tuple> affinityGroup = affinityGroups.get(0);
if(affinityGroup.size() == 0) {
//first operator/root in the logical plan
DataBag bag = derivedData.get(currentOp);
for(Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
Tuple t = it.next();
if(lineage.flattenLineageContains(t)) {
affinityGroup.add(t);
break;
}
affinityGroup.add(t);
}
}
//if any member of the affinity group is present in the flattenLineage, that needs to be handled specially
IdentityHashSet<Tuple> flattenElements = affinityGroups.get(1);
IdentityHashSet<Tuple> newAffinityGroup = new IdentityHashSet<Tuple>();
if(currentOp instanceof LOEval) {
EvalSpec spec = ((LOEval)currentOp).getSpec();
if(spec instanceof FilterSpec) {
//here we need to put a Tuple that doesn't pass the filter into the affinity groups.
LogicalOperator input = currentOp.getOpTable().get(currentOp.getInputs().get(0)); //since its a filter, we know there is only 1 input.
DataBag inputData = derivedData.get(input);
for(Iterator<Tuple> it = inputData.iterator(); it.hasNext();) {
Tuple t = it.next();
if(!((FilterSpec)spec).cond.eval(t)) {
newAffinityGroup.add(t);
break;
}
}
}
for(Tuple t : affinityGroup) {
if(lineage.flattenLineageContains(t)) {
flattenElements.addAll(lineage.getFlattenChildren(t));
}
newAffinityGroup.addAll(lineage.getChildren(t));
}
//Affinity group for eval ready, calling recursion
affinityGroup.clear();
affinityGroup.addAll(newAffinityGroup);
TrimLineages(root, currentOp.getOpTable().get(currentOp.getInputs().get(0)), baseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, logicalToPhysicalKeys, physicalOpTable);
}
if(currentOp instanceof LOCogroup) {
List<OperatorKey> inputs = currentOp.getInputs();
int numInputs = inputs.size();
IdentityHashSet<Tuple> flattenAffinityGroup = affinityGroups.get(1);
if(numInputs == 1) {
//this is a group case
double score = 0;
double bestScore = -1;
int best = 0;
double nextBestScore = -1;
int nextBest = 0;
for(Tuple t : affinityGroup) {
int index = 0;
List<Tuple> children = lineage.getChildren(t);
if(children.size() == 1) {
Tuple test = children.get(0);
if(!newAffinityGroup.contains(test))
newAffinityGroup.add(test);
} else {
if(flattenAffinityGroup.size() > 0) {
newAffinityGroup.addAll(flattenAffinityGroup);
flattenAffinityGroup.clear();
} else {
for(Tuple child : children) {
score = 1 / lineage.getWeightedCounts(child, 2, 1);
if(score > bestScore) {
nextBest = best;
best = index;
nextBestScore = bestScore;
bestScore = score;
} else if(score > nextBestScore) {
nextBest = index;
nextBestScore = score;
}
index++;
}
newAffinityGroup.add(children.get(best));
newAffinityGroup.add(children.get(nextBest));
}
}
}
affinityGroup.clear();
affinityGroup.addAll(newAffinityGroup);
TrimLineages(root, currentOp.getOpTable().get(inputs.get(0)), baseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, logicalToPhysicalKeys, physicalOpTable);
} else {
//This is a cogroup case
newAffinityGroup.addAll(affinityGroup);
for(int i = 0; i < numInputs; i++) {
IdentityHashSet<Tuple> cogroupAffinityGroup = new IdentityHashSet<Tuple>();
for(Tuple t : newAffinityGroup) {
DataBag data = t.getBagField(i+1);
//Ideally we should have no field with null values because of synthetic data generation.
if(data.size() == 0)
continue;
if(flattenAffinityGroup.size() > 0) {
for(Iterator<Tuple> it = data.iterator(); it.hasNext(); ) {
Tuple child = it.next();
if(flattenAffinityGroup.contains(child)) {
cogroupAffinityGroup.add(child);
flattenAffinityGroup.remove(child);
}
}
} else {
//The children are all the tuples present in data
Tuple best = null;
double bestScore = -1;
for(Iterator<Tuple> it = data.iterator(); it.hasNext(); ) {
Tuple child = it.next();
double score = 1 / lineage.getWeightedCounts(child, 2, 1);
if(score > bestScore) {
best = child;
bestScore = score;
}
}
cogroupAffinityGroup.add(best);
}
}
affinityGroup.clear();
affinityGroup.addAll(cogroupAffinityGroup);
TrimLineages(root, currentOp.getOpTable().get(inputs.get(i)), baseData, derivedData, affinityGroups, lineage, OperatorToEqClasses, completeness, logicalToPhysicalKeys, physicalOpTable);
}
}
}
if(currentOp instanceof LOLoad) {
getBaseData(currentOp, affinityGroup, lineage, baseData, derivedData);
}
return;
}
static void getBaseData(LogicalOperator lOp, IdentityHashSet<Tuple> affinityGroup, LineageTracer lineage, Map<LOLoad, DataBag> baseData, Map<LogicalOperator, DataBag> derivedData) {
DataBag bag = baseData.get(lOp);
IdentityHashSet<Tuple> temp = new IdentityHashSet<Tuple>();
if(bag == null) {
bag = BagFactory.getInstance().newDefaultBag();
baseData.put((LOLoad) lOp, bag);
} else {
//Now we try to ensure that the same tuple is not added twice
//In effect we are trying to get the union of multiple updates happening to the baseData
for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
temp.add(it.next());
}
bag.clear();
}
for(Tuple t : affinityGroup) {
temp.add(t);
}
for(Tuple t : temp) {
bag.add(t);
}
return;
}
static Map<LOLoad, DataBag> PruneBaseData(Map<LOLoad, DataBag> baseData, DataBag rootOutput, IdentityHashSet<Tuple> syntheticTuples, LineageTracer lineage, Collection<IdentityHashSet<Tuple>> equivalenceClasses) {
IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage.getMembershipMap();
IdentityHashMap<Tuple, Double> lineageGroupWeights = lineage.getWeightedCounts(2f, 1);
// compute a mapping from lineage group to the set of equivalence classes covered by it
IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, Set<Integer>>();
int equivClassId = 0;
for (IdentityHashSet<Tuple> equivClass : equivalenceClasses) {
for (Tuple t : equivClass) {
Tuple lineageGroup = lineage.getRepresentative(t);
Set<Integer> entry = lineageGroupToEquivClasses.get(lineageGroup);
if (entry == null) {
entry = new HashSet<Integer>();
lineageGroupToEquivClasses.put(lineageGroup, entry);
}
entry.add(equivClassId);
}
equivClassId++;
}
// select lineage groups such that we cover all equivalence classes
IdentityHashSet<Tuple> selectedLineageGroups = new IdentityHashSet<Tuple>();
while (!lineageGroupToEquivClasses.isEmpty()) {
// greedily find the lineage group with the best "score", where score = # equiv classes covered / group weight
double bestScore = -1;
Tuple bestLineageGroup = null;
Set<Integer> bestEquivClassesCovered = null;
for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
double weight = lineageGroupWeights.get(lineageGroup);
Set<Integer> equivClassesCovered = lineageGroupToEquivClasses.get(lineageGroup);
int numEquivClassesCovered = equivClassesCovered.size();
double score = ((double) numEquivClassesCovered) / ((double)weight);
if (score > bestScore) {
bestScore = score;
bestLineageGroup = lineageGroup;
bestEquivClassesCovered = equivClassesCovered;
}
}
// add the best-scoring lineage group to the set of ones we plan to retain
selectedLineageGroups.add(bestLineageGroup);
// make copy of bestEquivClassesCovered (or else the code that follows won't work correctly, because removing from the reference set)
Set<Integer> toCopy = bestEquivClassesCovered;
bestEquivClassesCovered = new HashSet<Integer>();
bestEquivClassesCovered.addAll(toCopy);
// remove the classes we've now covered
Collection<Tuple> toRemove = new LinkedList<Tuple>();
for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
Set<Integer> equivClasses = lineageGroupToEquivClasses.get(lineageGroup);
equivClasses.removeAll(bestEquivClassesCovered);
if (equivClasses.size() == 0) toRemove.add(lineageGroup);
}
for (Tuple removeMe : toRemove) lineageGroupToEquivClasses.remove(removeMe);
}
// revise baseData to only contain the tuples that are part of selectedLineageGroups
IdentityHashSet<Tuple> tuplesToRetain = new IdentityHashSet<Tuple>();
for (Tuple lineageGroup : selectedLineageGroups) {
Collection<Tuple> members = membershipMap.get(lineageGroup);
for (Tuple t : members) tuplesToRetain.add(t);
}
Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
for (LOLoad loadOp : baseData.keySet()) {
DataBag data = baseData.get(loadOp);
//DataBag newData = new DataBag();
DataBag newData = BagFactory.getInstance().newDefaultBag();
for (Iterator<Tuple> it = data.iterator(); it.hasNext(); ) {
Tuple t = it.next();
if (tuplesToRetain.contains(t)) newData.add(t);
}
newBaseData.put(loadOp, newData);
}
return newBaseData;
}
}