blob: 547c6b49e9eead51df2ac77ed4d514e6e71d1191 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.pen;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joda.time.DateTime;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.BinaryExpression;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.DivideExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
import org.apache.pig.newplan.logical.expression.IsNullExpression;
import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
import org.apache.pig.newplan.logical.expression.LessThanExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ModExpression;
import org.apache.pig.newplan.logical.expression.MultiplyExpression;
import org.apache.pig.newplan.logical.expression.NotEqualExpression;
import org.apache.pig.newplan.logical.expression.NotExpression;
import org.apache.pig.newplan.logical.expression.OrExpression;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.RegexExpression;
import org.apache.pig.newplan.logical.expression.SubtractExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
//This is used to generate synthetic data
//Synthetic data generation is done by making constraint tuples for each operator as we traverse the plan
//and try to replace the constraints with values as far as possible. We only deal with simple conditions right now
public class AugmentBaseDataVisitor extends LogicalRelationalNodesVisitor {
Map<LOLoad, DataBag> baseData = null;
Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
Map<Operator, DataBag> derivedData = null;
private boolean limit = false;
private final Map<Operator, PhysicalOperator> logToPhysMap;
private Map<LOLimit, Long> oriLimitMap;
Map<Operator, DataBag> outputConstraintsMap = new HashMap<Operator, DataBag>();
Log log = LogFactory.getLog(getClass());
// Augmentation moves from the leaves to root and hence needs a
// depthfirstwalker
public AugmentBaseDataVisitor(OperatorPlan plan,
Map<Operator, PhysicalOperator> logToPhysMap,
Map<LOLoad, DataBag> baseData,
Map<Operator, DataBag> derivedData) throws FrontendException {
super(plan, new PreOrderDepthFirstWalker(
plan));
this.baseData = baseData;
this.derivedData = derivedData;
this.logToPhysMap = logToPhysMap;
}
public void setLimit() {
limit = true;
}
public Map<LOLoad, DataBag> getNewBaseData() throws ExecException {
// consolidate base data from different LOADs on the same inputs
MultiMap<FileSpec, DataBag> inputDataMap = new MultiMap<FileSpec, DataBag>();
for (Map.Entry<LOLoad, DataBag> e : newBaseData.entrySet()) {
inputDataMap.put(e.getKey().getFileSpec(), e.getValue());
}
int index = 0;
for (FileSpec fs : inputDataMap.keySet()) {
int maxSchemaSize = 0;
Tuple tupleOfMaxSchemaSize = null;
for (DataBag bag : inputDataMap.get(fs)) {
if (bag.size() > 0) {
int size = 0;
Tuple t = null;
t = bag.iterator().next();
size = t.size();
if (size > maxSchemaSize) {
maxSchemaSize = size;
tupleOfMaxSchemaSize = t;
}
}
}
for (DataBag bag : inputDataMap.get(fs)) {
if (bag.size() > 0) {
for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
Tuple t = it.next();
for (int i = t.size(); i < maxSchemaSize; ++i) {
t.append(tupleOfMaxSchemaSize.get(i));
}
}
}
}
index++;
}
for (Map.Entry<LOLoad, DataBag> e : baseData.entrySet()) {
DataBag bag = newBaseData.get(e.getKey());
if (bag == null) {
bag = BagFactory.getInstance().newDefaultBag();
newBaseData.put(e.getKey(), bag);
}
bag.addAll(e.getValue());
}
return newBaseData;
}
public Map<LOLimit, Long> getOriLimitMap() {
return oriLimitMap;
}
@Override
public void visit(LOCogroup cg) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
// we first get the outputconstraints for the current cogroup
DataBag outputConstraints = outputConstraintsMap.get(cg);
outputConstraintsMap.remove(cg);
boolean ableToHandle = true;
// we then check if we can handle this cogroup and try to collect some
// information about grouping
List<List<Integer>> groupSpecs = new LinkedList<List<Integer>>();
int numCols = -1;
for (int index = 0; index < cg.getInputs((LogicalPlan)plan).size(); ++index) {
Collection<LogicalExpressionPlan> groupByPlans =
cg.getExpressionPlans().get(index);
List<Integer> groupCols = new ArrayList<Integer>();
for (LogicalExpressionPlan plan : groupByPlans) {
Operator leaf = plan.getSinks().get(0);
if (leaf instanceof ProjectExpression) {
groupCols.add(Integer.valueOf(((ProjectExpression) leaf).getColNum()));
} else {
ableToHandle = false;
break;
}
}
if (numCols == -1) {
numCols = groupCols.size();
}
if (groupCols.size() != groupByPlans.size()
|| groupCols.size() != numCols) {
// we came across an unworkable cogroup plan
break;
} else {
groupSpecs.add(groupCols);
}
}
// we should now have some workable data at this point to synthesize
// tuples
try {
if (ableToHandle) {
// we need to go through the output constraints first
int numInputs = cg.getInputs((LogicalPlan) plan).size();
if (outputConstraints != null) {
for (Iterator<Tuple> it = outputConstraints.iterator(); it
.hasNext();) {
Tuple outputConstraint = it.next();
Object groupLabel = outputConstraint.get(0);
for (int input = 0; input < numInputs; input++) {
int numInputFields = ((LogicalRelationalOperator) cg.getInputs((LogicalPlan) plan).get(input))
.getSchema().size();
List<Integer> groupCols = groupSpecs.get(input);
DataBag output = outputConstraintsMap.get(cg
.getInputs((LogicalPlan) plan).get(input));
if (output == null) {
output = BagFactory.getInstance()
.newDefaultBag();
outputConstraintsMap.put(cg.getInputs((LogicalPlan) plan).get(
input), output);
}
for (int i = 0; i < 2; i++) {
Tuple inputConstraint = GetGroupByInput(
groupLabel, groupCols, numInputFields);
if (inputConstraint != null)
output.add(inputConstraint);
}
}
}
}
// then, go through all organic data groups and add input
// constraints to make each group big enough
DataBag outputData = derivedData.get(cg);
for (Iterator<Tuple> it = outputData.iterator(); it.hasNext();) {
Tuple groupTup = it.next();
Object groupLabel = groupTup.get(0);
for (int input = 0; input < numInputs; input++) {
int numInputFields = ((LogicalRelationalOperator)cg.getInputs((LogicalPlan) plan).get(input))
.getSchema().size();
List<Integer> groupCols = groupSpecs.get(input);
DataBag output = outputConstraintsMap.get(cg
.getInputs((LogicalPlan) plan).get(input));
if (output == null) {
output = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(cg.getInputs((LogicalPlan) plan).get(input),
output);
}
int numTupsToAdd = 2
- (int) ((DataBag) groupTup.get(input + 1))
.size();
for (int i = 0; i < numTupsToAdd; i++) {
Tuple inputConstraint = GetGroupByInput(groupLabel,
groupCols, numInputFields);
if (inputConstraint != null)
output.add(inputConstraint);
}
}
}
}
} catch (Exception e) {
log
.error("Error visiting Cogroup during Augmentation phase of Example Generator! "
+ e.getMessage());
throw new FrontendException(
"Error visiting Cogroup during Augmentation phase of Example Generator! "
+ e.getMessage());
}
}
@Override
public void visit(LOJoin join) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
// we first get the outputconstraints for the current cogroup
DataBag outputConstraints = outputConstraintsMap.get(join);
outputConstraintsMap.remove(join);
boolean ableToHandle = true;
// we then check if we can handle this cogroup and try to collect some
// information about grouping
List<List<Integer>> groupSpecs = new LinkedList<List<Integer>>();
int numCols = -1;
for (int index = 0; index < join.getInputs((LogicalPlan)plan).size(); ++index) {
Collection<LogicalExpressionPlan> groupByPlans =
join.getExpressionPlans().get(index);
List<Integer> groupCols = new ArrayList<Integer>();
for (LogicalExpressionPlan plan : groupByPlans) {
Operator leaf = plan.getSinks().get(0);
if (leaf instanceof ProjectExpression) {
groupCols.add(Integer.valueOf(((ProjectExpression) leaf).getColNum()));
} else {
ableToHandle = false;
break;
}
}
if (numCols == -1) {
numCols = groupCols.size();
}
if (groupCols.size() != groupByPlans.size()
|| groupCols.size() != numCols) {
// we came across an unworkable cogroup plan
break;
} else {
groupSpecs.add(groupCols);
}
}
// we should now have some workable data at this point to synthesize
// tuples
try {
if (ableToHandle) {
// we need to go through the output constraints first
int numInputs = join.getInputs((LogicalPlan) plan).size();
if (outputConstraints != null) {
for (Iterator<Tuple> it = outputConstraints.iterator(); it
.hasNext();) {
Tuple outputConstraint = it.next();
for (int input = 0; input < numInputs; input++) {
int numInputFields = ((LogicalRelationalOperator) join.getInputs((LogicalPlan) plan).get(input))
.getSchema().size();
List<Integer> groupCols = groupSpecs.get(input);
DataBag output = outputConstraintsMap.get(join
.getInputs((LogicalPlan) plan).get(input));
if (output == null) {
output = BagFactory.getInstance()
.newDefaultBag();
outputConstraintsMap.put(join.getInputs((LogicalPlan) plan).get(
input), output);
}
Tuple inputConstraint = GetJoinInput(
outputConstraint, groupCols, numInputFields);
if (inputConstraint != null)
output.add(inputConstraint);
}
}
}
// then, go through all organic data groups and add input
// constraints to make each group big enough
DataBag outputData = derivedData.get(join);
if (outputData.size() == 0) {
DataBag output0 = outputConstraintsMap.get(join.getInputs((LogicalPlan) plan).get(0));
if (output0 == null || output0.size() == 0) {
output0 = derivedData.get(join.getInputs((LogicalPlan) plan).get(0));
}
Tuple inputConstraint0 = output0.iterator().next();
for (int input = 1; input < numInputs; input++) {
DataBag output = outputConstraintsMap.get(join.getInputs((LogicalPlan) plan).get(input));
if (output == null)
{
output = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(join.getInputs((LogicalPlan) plan).get(input),
output);
}
int numInputFields = ((LogicalRelationalOperator)join.getInputs((LogicalPlan) plan).get(input)).getSchema().size();
Tuple inputConstraint = GetJoinInput(inputConstraint0, groupSpecs.get(0), groupSpecs.get(input), numInputFields);
if (inputConstraint != null)
output.add(inputConstraint);
}
}
}
} catch (Exception e) {
log
.error("Error visiting Cogroup during Augmentation phase of Example Generator! "
+ e.getMessage());
throw new FrontendException(
"Error visiting Cogroup during Augmentation phase of Example Generator! "
+ e.getMessage());
}
}
@Override
public void visit(LOCross cs) throws FrontendException {
}
@Override
public void visit(LODistinct dt) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(dt);
outputConstraintsMap.remove(dt);
DataBag inputConstraints = outputConstraintsMap.get(dt.getInput((LogicalPlan) plan));
if (inputConstraints == null) {
inputConstraints = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(dt.getInput((LogicalPlan) plan), inputConstraints);
}
if (outputConstraints != null && outputConstraints.size() > 0) {
for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();)
{
inputConstraints.add(it.next());
}
}
boolean emptyInputConstraints = inputConstraints.size() == 0;
if (emptyInputConstraints) {
DataBag inputData = derivedData.get(dt.getInput((LogicalPlan) plan));
for (Iterator<Tuple> it = inputData.iterator(); it.hasNext();)
{
inputConstraints.add(it.next());
}
}
Set<Tuple> distinctSet = new HashSet<Tuple>();
Iterator<Tuple> it;
for (it = inputConstraints.iterator(); it.hasNext();) {
if (!distinctSet.add(it.next()))
break;
}
if (!it.hasNext())
{
// no duplicates found: generate one
if (inputConstraints.size()> 0) {
Tuple src = ((ExampleTuple)inputConstraints.iterator().next()).toTuple(),
tgt = TupleFactory.getInstance().newTuple(src.getAll());
ExampleTuple inputConstraint = new ExampleTuple(tgt);
inputConstraint.synthetic = true;
inputConstraints.add(inputConstraint);
} else if (emptyInputConstraints)
inputConstraints.clear();
}
}
@Override
public void visit(LOFilter filter) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(filter);
outputConstraintsMap.remove(filter);
LogicalExpressionPlan filterCond = filter.getFilterPlan();
DataBag inputConstraints = outputConstraintsMap.get(filter.getInput((LogicalPlan) plan));
if (inputConstraints == null) {
inputConstraints = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(filter.getInput((LogicalPlan) plan), inputConstraints);
}
DataBag outputData = derivedData.get(filter);
DataBag inputData = derivedData.get(filter.getInput((LogicalPlan) plan));
try {
if (outputConstraints != null && outputConstraints.size() > 0) { // there
// 's
// one
// or
// more
// output
// constraints
// ;
// generate
// corresponding
// input
// constraints
for (Iterator<Tuple> it = outputConstraints.iterator(); it
.hasNext();) {
Tuple outputConstraint = it.next();
ExampleTuple inputConstraint = GenerateMatchingTuple(
outputConstraint, filterCond, false);
if (inputConstraint != null)
inputConstraints.add(inputConstraint);
}
} else if (outputData.size() == 0) { // no output constraints, but
// output is empty; generate
// one input that will pass the
// filter
ExampleTuple inputConstraint = GenerateMatchingTuple(filter
.getSchema(), filterCond, false);
if (inputConstraint != null)
inputConstraints.add(inputConstraint);
}
// if necessary, insert a negative example (i.e. a tuple that does
// not pass the filter)
if (outputData.size() == inputData.size()) { // all tuples pass the
// filter; generate one
// input that will not
// pass the filter
ExampleTuple inputConstraint = GenerateMatchingTuple(filter
.getSchema(), filterCond, true);
if (inputConstraint != null)
inputConstraints.add(inputConstraint);
}
} catch (Exception e) {
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage(), e);
throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage(), e);
}
}
@Override
public void visit(LOForEach forEach) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(forEach);
outputConstraintsMap.remove(forEach);
LogicalPlan plan = forEach.getInnerPlan();
boolean ableToHandle = true;
List<Integer> cols = new ArrayList<Integer>();
boolean cast = false;
if (outputConstraints == null || outputConstraints.size() == 0)
// we dont have to do anything in this case
return;
Operator op = plan.getSinks().get(0);
if (op instanceof CastExpression) {
cast = true;
op = ((CastExpression) op).getExpression();
}
if (!(op instanceof ProjectExpression)) {
ableToHandle = false;
} else {
cols.add(Integer.valueOf(((ProjectExpression) op).getColNum()));
}
if (ableToHandle) {
// we can only handle simple projections
DataBag output = BagFactory.getInstance().newDefaultBag();
for (Iterator<Tuple> it = outputConstraints.iterator(); it
.hasNext();) {
Tuple outputConstraint = it.next();
try {
Tuple inputConstraint = BackPropConstraint(
outputConstraint, cols, ((LogicalRelationalOperator)plan
.getPredecessors(forEach).get(0))
.getSchema(), cast);
output.add(inputConstraint);
} catch (Exception e) {
e.printStackTrace();
throw new FrontendException(
"Operator error during Augmenting Phase in Example Generator "
+ e.getMessage());
}
}
outputConstraintsMap.put(plan.getPredecessors(forEach)
.get(0), output);
}
}
@Override
public void visit(LOLoad load) throws FrontendException {
DataBag inputData = baseData.get(load);
// check if the inputData exists
if (inputData == null || inputData.size() == 0) {
log.error("No (valid) input data found!");
throw new RuntimeException("No (valid) input data found!");
}
DataBag newInputData = newBaseData.get(load);
if (newInputData == null) {
newInputData = BagFactory.getInstance().newDefaultBag();
newBaseData.put(load, newInputData);
}
LogicalSchema schema;
try {
schema = load.getSchema();
if (schema == null)
throw new RuntimeException(
"Example Generator requires a schema. Please provide a schema while loading data");
} catch (FrontendException e) {
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
}
Tuple exampleTuple = inputData.iterator().next();
DataBag outputConstraints = outputConstraintsMap.get(load);
outputConstraintsMap.remove(load);
// first of all, we are required to guarantee that there is at least one
// output tuple
if (outputConstraints == null || outputConstraints.size() == 0) {
outputConstraints = BagFactory.getInstance().newDefaultBag();
outputConstraints.add(TupleFactory.getInstance().newTuple(
schema.getFields().size()));
}
// create example tuple to steal values from when we encounter
// "don't care" fields (i.e. null fields)
System.out.println(exampleTuple.toString());
// run through output constraints; for each one synthesize a tuple and
// add it to the base data
// (while synthesizing individual fields, try to match fields that exist
// in the real data)
boolean newInput = false;
for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();) {
Tuple outputConstraint = it.next();
// sanity check:
if (outputConstraint.size() != schema.getFields().size())
throw new RuntimeException(
"Internal error: incorrect number of fields in constraint tuple.");
Tuple inputT = TupleFactory.getInstance().newTuple(
outputConstraint.size());
ExampleTuple inputTuple = new ExampleTuple(inputT);
try {
for (int i = 0; i < inputTuple.size(); i++) {
Object d = outputConstraint.get(i);
if (d == null && i < exampleTuple.size())
d = exampleTuple.get(i);
inputTuple.set(i, d);
}
if (outputConstraint instanceof ExampleTuple)
inputTuple.synthetic = ((ExampleTuple) outputConstraint).synthetic;
else
// raw tuple should have been synthesized
inputTuple.synthetic = true;
} catch (ExecException e) {
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
}
try {
if (inputTuple.synthetic || !inInput(inputTuple, inputData, schema))
{
inputTuple.synthetic = true;
newInputData.add(inputTuple);
if (!newInput)
newInput = true;
}
} catch (ExecException e) {
throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
}
}
}
private boolean inInput(Tuple newTuple, DataBag input, LogicalSchema schema) throws ExecException {
boolean result;
for (Iterator<Tuple> iter = input.iterator(); iter.hasNext();) {
result = true;
Tuple tmp = iter.next();
for (int i = 0; i < schema.size(); ++i)
if (!newTuple.get(i).equals(tmp.get(i)))
{
result = false;
break;
}
if (result)
return true;
}
return false;
}
@Override
public void visit(LOSort s) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(s);
outputConstraintsMap.remove(s);
if (outputConstraints == null)
outputConstraintsMap.put(s.getInput((LogicalPlan) plan), BagFactory.getInstance()
.newDefaultBag());
else
outputConstraintsMap.put(s.getInput((LogicalPlan) plan), outputConstraints);
}
@Override
public void visit(LOSplit split) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
}
@Override
public void visit(LOStore store) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(store);
if (outputConstraints == null) {
outputConstraintsMap.put(plan.getPredecessors(store)
.get(0), BagFactory.getInstance().newDefaultBag());
} else {
outputConstraintsMap.remove(store);
outputConstraintsMap.put(plan.getPredecessors(store)
.get(0), outputConstraints);
}
}
@Override
public void visit(LOUnion u) throws FrontendException {
if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(u);
outputConstraintsMap.remove(u);
if (outputConstraints == null || outputConstraints.size() == 0) {
// we dont need to do anything
// we just find the inputs, create empty bags as their
// outputConstraints and return
for (Operator op : u.getInputs((LogicalPlan) plan)) {
DataBag constraints = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(op, constraints);
}
return;
}
// since we have some outputConstraints, we apply them to the inputs
// round-robin
int count = 0;
List<Operator> inputs = u.getInputs(((LogicalPlan) plan));
int noInputs = inputs.size();
for (Operator op : inputs) {
DataBag constraint = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(op, constraint);
}
for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();) {
DataBag constraint = outputConstraintsMap.get(inputs.get(count));
constraint.add(it.next());
count = (count + 1) % noInputs;
}
}
@Override
public void visit(LOLimit lm) throws FrontendException {
if (!limit) // not augment for LIMIT in this traversal
return;
if (oriLimitMap == null)
oriLimitMap = new HashMap<LOLimit, Long>();
DataBag outputConstraints = outputConstraintsMap.get(lm);
outputConstraintsMap.remove(lm);
DataBag inputConstraints = outputConstraintsMap.get(lm.getInput((LogicalPlan) plan));
if (inputConstraints == null) {
inputConstraints = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(lm.getInput((LogicalPlan) plan), inputConstraints);
}
DataBag inputData = derivedData.get(lm.getInput((LogicalPlan) plan));
if (outputConstraints != null && outputConstraints.size() > 0) { // there
// 's
// one
// or
// more
// output
// constraints
// ;
// generate
// corresponding
// input
// constraints
for (Iterator<Tuple> it = outputConstraints.iterator(); it
.hasNext();) {
inputConstraints.add(it.next());
// ... plus one more if only one
if (inputConstraints.size() == 1) {
inputConstraints.add(inputData.iterator().next());
((PreOrderDepthFirstWalker) currentWalker).setBranchFlag();
}
}
} else if (inputConstraints.size() == 0){
// add all input to input constraints ...
inputConstraints.addAll(inputData);
// ... plus one more if only one
if (inputConstraints.size() == 1) {
inputConstraints.add(inputData.iterator().next());
((PreOrderDepthFirstWalker) currentWalker).setBranchFlag();
}
}
POLimit poLimit = (POLimit) logToPhysMap.get(lm);
oriLimitMap.put(lm, Long.valueOf(poLimit.getLimit()));
poLimit.setLimit(inputConstraints.size()-1);
lm.setLimit(poLimit.getLimit());
}
Tuple GetGroupByInput(Object groupLabel, List<Integer> groupCols,
int numFields) throws ExecException {
Tuple t = TupleFactory.getInstance().newTuple(numFields);
if (groupCols.size() == 1) {
// GroupLabel would be a data atom
t.set(groupCols.get(0), groupLabel);
} else {
if (!(groupLabel instanceof Tuple))
throw new RuntimeException("Unrecognized group label!");
Tuple group = (Tuple) groupLabel;
for (int i = 0; i < groupCols.size(); i++) {
t.set(groupCols.get(i), group.get(i));
}
}
return t;
}
Tuple GetJoinInput(Tuple group, List<Integer> groupCols0, List<Integer> groupCols,
int numFields) throws ExecException {
Tuple t = TupleFactory.getInstance().newTuple(numFields);
if (groupCols.size() == 1) {
// GroupLabel would be a data atom
t.set(groupCols.get(0), group.get(groupCols0.get(0)));
} else {
if (!(group instanceof Tuple))
throw new RuntimeException("Unrecognized group label!");
for (int i = 0; i < groupCols.size(); i++) {
t.set(groupCols.get(i), group.get(groupCols0.get(i)));
}
}
return t;
}
Tuple GetJoinInput(Tuple group, List<Integer> groupCols,
int numFields) throws ExecException {
Tuple t = TupleFactory.getInstance().newTuple(numFields);
if (groupCols.size() == 1) {
// GroupLabel would be a data atom
t.set(groupCols.get(0), group);
} else {
if (!(group instanceof Tuple))
throw new RuntimeException("Unrecognized group label!");
for (int i = 0; i < groupCols.size(); i++) {
t.set(groupCols.get(i), group.get(i));
}
}
return t;
}
Tuple BackPropConstraint(Tuple outputConstraint, List<Integer> cols,
LogicalSchema inputSchema, boolean cast) throws ExecException {
Tuple inputConst = TupleFactory.getInstance().newTuple(
inputSchema.getFields().size());
Tuple inputConstraint = new ExampleTuple(inputConst);
for (int outCol = 0; outCol < outputConstraint.size(); outCol++) {
int inCol = cols.get(outCol);
Object outVal = outputConstraint.get(outCol);
Object inVal = inputConstraint.get(inCol);
if (inVal == null && outVal != null) {
// inputConstraint.set(inCol, outVal);
inputConstraint.set(inCol, (cast) ? new DataByteArray(outVal
.toString().getBytes()) : outVal);
} else {
if (outVal != null) {
// unable to back-propagate, due to conflicting column
// constraints, so give up
return null;
}
}
}
return inputConstraint;
}
// generate a constraint tuple that conforms to the schema and passes the
// predicate
// (or null if unable to find such a tuple)
ExampleTuple GenerateMatchingTuple(LogicalSchema schema, LogicalExpressionPlan plan,
boolean invert) throws FrontendException, ExecException {
return GenerateMatchingTuple(TupleFactory.getInstance().newTuple(
schema.getFields().size()), plan, invert);
}
// generate a constraint tuple that conforms to the constraint and passes
// the predicate
// (or null if unable to find such a tuple)
//
// for now, constraint tuples are tuples whose fields are a blend of actual
// data values and nulls,
// where a null stands for "don't care"
//
// in the future, may want to replace "don't care" with a more rich
// constraint language; this would
// help, e.g. in the case of two filters in a row (you want the downstream
// filter to tell the upstream filter
// what predicate it wants satisfied in a given field)
//
ExampleTuple GenerateMatchingTuple(Tuple constraint, LogicalExpressionPlan predicate,
boolean invert) throws ExecException, FrontendException {
Tuple t = TupleFactory.getInstance().newTuple(constraint.size());
ExampleTuple tOut = new ExampleTuple(t);
for (int i = 0; i < t.size(); i++)
tOut.set(i, constraint.get(i));
GenerateMatchingTupleHelper(tOut, predicate
.getSources().get(0), invert);
tOut.synthetic = true;
return tOut;
}
void GenerateMatchingTupleHelper(Tuple t, Operator pred,
boolean invert) throws FrontendException, ExecException {
if (pred instanceof BinaryExpression)
GenerateMatchingTupleHelper(t, (BinaryExpression) pred,
invert);
else if (pred instanceof NotExpression)
GenerateMatchingTupleHelper(t, (NotExpression) pred, invert);
else if (pred instanceof IsNullExpression)
GenerateMatchingTupleHelper(t, (IsNullExpression) pred, invert);
else if (pred instanceof UserFuncExpression)
// Don't know how to generate input tuple for UDF, return null
// to suppress the generation
t = null;
else
throw new FrontendException("Unknown operator in filter predicate");
}
void GenerateMatchingTupleHelper(Tuple t, BinaryExpression pred,
boolean invert) throws FrontendException, ExecException {
if (pred instanceof AndExpression) {
GenerateMatchingTupleHelper(t, (AndExpression) pred, invert);
return;
} else if (pred instanceof OrExpression) {
GenerateMatchingTupleHelper(t, (OrExpression) pred, invert);
return;
}
// now we are sure that the expression operators are the roots of the
// plan
boolean leftIsConst = false, rightIsConst = false;
Object leftConst = null, rightConst = null;
byte leftDataType = 0, rightDataType = 0;
int leftCol = -1, rightCol = -1;
if (pred instanceof AddExpression || pred instanceof SubtractExpression
|| pred instanceof MultiplyExpression || pred instanceof DivideExpression
|| pred instanceof ModExpression || pred instanceof RegexExpression)
return; // We don't try to work around these operators right now
if (pred.getLhs() instanceof ConstantExpression) {
leftIsConst = true;
leftConst = ((ConstantExpression) (pred.getLhs())).getValue();
} else {
LogicalExpression lhs = pred.getLhs();
if (lhs instanceof CastExpression)
lhs = ((CastExpression) lhs).getExpression();
// if (!(pred.getLhsOperand() instanceof ProjectExpression && ((ProjectExpression)
// pred
// .getLhsOperand()).getProjection().size() == 1))
// return; // too hard
if (!(lhs instanceof ProjectExpression))
return;
leftCol = ((ProjectExpression) lhs).getColNum();
leftDataType = ((ProjectExpression) lhs).getType();
Object d = t.get(leftCol);
if (d != null) {
leftIsConst = true;
leftConst = d;
}
}
if (pred.getRhs() instanceof ConstantExpression) {
rightIsConst = true;
rightConst = ((ConstantExpression) (pred.getRhs())).getValue();
} else {
Operator rhs = pred.getRhs();
if (rhs instanceof CastExpression)
rhs = ((CastExpression) rhs).getExpression();
// if (!(pred.getRhsOperand() instanceof ProjectExpression && ((ProjectExpression)
// pred
// .getRhsOperand()).getProjection().size() == 1))
// return; // too hard
if (!(rhs instanceof ProjectExpression))
return;
rightCol = ((ProjectExpression) rhs).getColNum();
rightDataType = ((ProjectExpression) rhs).getType();
Object d = t.get(rightCol);
if (d != null) {
rightIsConst = true;
rightConst = d;
}
}
if (leftIsConst && rightIsConst)
return; // can't really change the result if both are constants
// now we try to change some nulls to constants
// convert some nulls to constants
if (!invert) {
if (pred instanceof EqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, leftConst
.toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, rightConst
.toString()));
} else {
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "0"));
}
} else if (pred instanceof NotEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetUnequalValue(leftConst).toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, GetUnequalValue(
rightConst).toString()));
} else {
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
} else if (pred instanceof GreaterThanExpression
|| pred instanceof GreaterThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetSmallerValue(leftConst).toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, GetLargerValue(
rightConst).toString()));
} else {
t.set(leftCol, generateData(leftDataType, "1"));
t.set(rightCol, generateData(rightDataType, "0"));
}
} else if (pred instanceof LessThanExpression
|| pred instanceof LessThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, GetLargerValue(
leftConst).toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, GetSmallerValue(
rightConst).toString()));
} else {
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
}
} else {
if (pred instanceof EqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetUnequalValue(leftConst).toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, GetUnequalValue(
rightConst).toString()));
} else {
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
} else if (pred instanceof NotEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, leftConst
.toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, rightConst
.toString()));
} else {
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "0"));
}
} else if (pred instanceof GreaterThanExpression
|| pred instanceof GreaterThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, GetLargerValue(
leftConst).toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, GetSmallerValue(
rightConst).toString()));
} else {
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
} else if (pred instanceof LessThanExpression
|| pred instanceof LessThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetSmallerValue(leftConst).toString()));
} else if (rightIsConst) {
t.set(leftCol, generateData(leftDataType, GetLargerValue(
rightConst).toString()));
} else {
t.set(leftCol, generateData(leftDataType, "1"));
t.set(rightCol, generateData(rightDataType, "0"));
}
}
}
}
void GenerateMatchingTupleHelper(Tuple t, AndExpression op, boolean invert)
throws FrontendException, ExecException {
Operator input = op.getLhs();
GenerateMatchingTupleHelper(t, input, invert);
input = op.getRhs();
GenerateMatchingTupleHelper(t, input, invert);
}
void GenerateMatchingTupleHelper(Tuple t, OrExpression op, boolean invert)
throws FrontendException, ExecException {
Operator input = op.getLhs();
GenerateMatchingTupleHelper(t, input, invert);
input = op.getRhs();
GenerateMatchingTupleHelper(t, input, invert);
}
void GenerateMatchingTupleHelper(Tuple t, NotExpression op, boolean invert)
throws FrontendException, ExecException {
LogicalExpression input = op.getExpression();
GenerateMatchingTupleHelper(t, input, !invert);
}
void GenerateMatchingTupleHelper(Tuple t, IsNullExpression op, boolean invert)
throws FrontendException, ExecException {
byte type = op.getExpression().getType();
if (!invert)
t.set(0, null);
else
t.set(0, generateData(type, "0"));
}
Object GetUnequalValue(Object v) {
byte type = DataType.findType(v);
if (type == DataType.BAG || type == DataType.TUPLE
|| type == DataType.MAP)
return null;
Object zero = generateData(type, "0");
if (v.equals(zero))
return generateData(type, "1");
return zero;
}
Object GetSmallerValue(Object v) {
byte type = DataType.findType(v);
if (type == DataType.BAG || type == DataType.TUPLE
|| type == DataType.MAP)
return null;
switch (type) {
case DataType.CHARARRAY:
String str = (String) v;
if (str.length() > 0)
return str.substring(0, str.length() - 1);
else
return null;
case DataType.BYTEARRAY:
DataByteArray data = (DataByteArray) v;
if (data.size() > 0)
return new DataByteArray(data.get(), 0, data.size() - 1);
else
return null;
case DataType.INTEGER:
return Integer.valueOf((Integer) v - 1);
case DataType.LONG:
return Long.valueOf((Long) v - 1);
case DataType.FLOAT:
return Float.valueOf((Float) v - 1);
case DataType.DOUBLE:
return Double.valueOf((Double) v - 1);
case DataType.BIGINTEGER:
return ((BigInteger)v).subtract(BigInteger.ONE);
case DataType.BIGDECIMAL:
return ((BigDecimal)v).subtract(BigDecimal.ONE);
case DataType.DATETIME:
DateTime dt = (DateTime) v;
if (dt.getMillisOfSecond() != 0) {
return dt.minusMillis(1);
} else if (dt.getSecondOfMinute() != 0) {
return dt.minusSeconds(1);
} else if (dt.getMinuteOfHour() != 0) {
return dt.minusMinutes(1);
} else if (dt.getHourOfDay() != 0) {
return dt.minusHours(1);
} else {
return dt.minusDays(1);
}
default:
return null;
}
}
Object GetLargerValue(Object v) {
byte type = DataType.findType(v);
if (type == DataType.BAG || type == DataType.TUPLE
|| type == DataType.MAP)
return null;
switch (type) {
case DataType.CHARARRAY:
return (String) v + "0";
case DataType.BYTEARRAY:
String str = ((DataByteArray) v).toString();
str = str + "0";
return new DataByteArray(str);
case DataType.INTEGER:
return Integer.valueOf((Integer) v + 1);
case DataType.LONG:
return Long.valueOf((Long) v + 1);
case DataType.FLOAT:
return Float.valueOf((Float) v + 1);
case DataType.DOUBLE:
return Double.valueOf((Double) v + 1);
case DataType.BIGINTEGER:
return ((BigInteger)v).add(BigInteger.ONE);
case DataType.BIGDECIMAL:
return ((BigDecimal)v).add(BigDecimal.ONE);
case DataType.DATETIME:
DateTime dt = (DateTime) v;
if (dt.getMillisOfSecond() != 0) {
return dt.plusMillis(1);
} else if (dt.getSecondOfMinute() != 0) {
return dt.plusSeconds(1);
} else if (dt.getMinuteOfHour() != 0) {
return dt.plusMinutes(1);
} else if (dt.getHourOfDay() != 0) {
return dt.plusHours(1);
} else {
return dt.plusDays(1);
}
default:
return null;
}
}
Object generateData(byte type, String data) {
switch (type) {
case DataType.BOOLEAN:
if (data.equalsIgnoreCase("true")) {
return Boolean.TRUE;
} else if (data.equalsIgnoreCase("false")) {
return Boolean.FALSE;
} else {
return null;
}
case DataType.BYTEARRAY:
return new DataByteArray(data.getBytes());
case DataType.DOUBLE:
return Double.valueOf(data);
case DataType.FLOAT:
return Float.valueOf(data);
case DataType.INTEGER:
return Integer.valueOf(data);
case DataType.LONG:
return Long.valueOf(data);
case DataType.BIGINTEGER:
return new BigInteger(data);
case DataType.BIGDECIMAL:
return new BigDecimal(data);
case DataType.DATETIME:
return new DateTime(data);
case DataType.CHARARRAY:
return data;
default:
return null;
}
}
}