blob: 5d6a3a1e7cd2cfaedf4a1b9ece2b13d84a2fc567 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.newplan.logical.optimizer;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.PlanValidationException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LOCube;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
public class SchemaResetter extends LogicalRelationalNodesVisitor {
// uid duplicates are removed only after optimizer rule
// DuplicateForEachColumnRewrite has run. So disable it in calls before that
boolean skipDuplicateUidCheck = true;
private void visitAll(Collection<LogicalExpressionPlan> lexpPlans) throws FrontendException {
for (LogicalExpressionPlan expPlan : lexpPlans) {
FieldSchemaResetter fsResetter = new FieldSchemaResetter(expPlan);
fsResetter.visit();
}
}
public SchemaResetter(OperatorPlan plan) throws FrontendException {
this(plan, false);
}
public SchemaResetter(OperatorPlan plan, boolean skipDuplicateUidCheck)
throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
this.skipDuplicateUidCheck = skipDuplicateUidCheck;
}
@Override
public void visit(LOLoad load) throws FrontendException {
load.resetSchema();
validate(load.getSchema());
}
@Override
public void visit(LOFilter filter) throws FrontendException {
filter.resetSchema();
FieldSchemaResetter fsResetter = new FieldSchemaResetter(filter.getFilterPlan());
fsResetter.visit();
validate(filter.getSchema());
}
@Override
public void visit(LOStore store) throws FrontendException {
store.resetSchema();
validate(store.getSchema());
}
@Override
public void visit(LOJoin join) throws FrontendException {
join.resetSchema();
visitAll(join.getExpressionPlanValues());
validate(join.getSchema());
}
@Override
public void visit(LOForEach foreach) throws FrontendException {
foreach.resetSchema();
OperatorPlan innerPlan = foreach.getInnerPlan();
PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
pushWalker(newWalker);
currentWalker.walk(this);
popWalker();
validate(foreach.getSchema());
}
@Override
public void visit(LOGenerate gen) throws FrontendException {
gen.resetSchema();
visitAll(gen.getOutputPlans());
validate(gen.getSchema());
}
@Override
public void visit(LOInnerLoad load) throws FrontendException {
load.resetSchema();
load.getProjection().resetFieldSchema();
load.getSchema();
}
@Override
public void visit(LOCube loCube) throws FrontendException {
loCube.resetSchema();
visitAll(loCube.getExpressionPlans().values());
validate(loCube.getSchema());
}
@Override
public void visit(LOCogroup loCogroup) throws FrontendException {
loCogroup.resetSchema();
visitAll(loCogroup.getExpressionPlans().values());
validate(loCogroup.getSchema());
}
@Override
public void visit(LOSplit loSplit) throws FrontendException {
loSplit.resetSchema();
validate(loSplit.getSchema());
}
@Override
public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
loSplitOutput.resetSchema();
FieldSchemaResetter fsResetter = new FieldSchemaResetter(loSplitOutput.getFilterPlan());
fsResetter.visit();
validate(loSplitOutput.getSchema());
}
@Override
public void visit(LOUnion loUnion) throws FrontendException {
loUnion.resetSchema();
validate(loUnion.getSchema());
}
@Override
public void visit(LOSort loSort) throws FrontendException {
loSort.resetSchema();
visitAll(loSort.getSortColPlans());
validate(loSort.getSchema());
}
@Override
public void visit(LORank loRank) throws FrontendException{
loRank.resetSchema();
visitAll(loRank.getRankColPlans());
validate(loRank.getSchema());
}
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
loDistinct.resetSchema();
validate(loDistinct.getSchema());
}
@Override
public void visit(LOLimit loLimit) throws FrontendException {
loLimit.resetSchema();
if (loLimit.getLimitPlan() != null) {
FieldSchemaResetter fsResetter = new FieldSchemaResetter(
loLimit.getLimitPlan());
fsResetter.visit();
}
validate(loLimit.getSchema());
}
@Override
public void visit(LOCross loCross) throws FrontendException {
loCross.resetSchema();
validate(loCross.getSchema());
}
@Override
public void visit(LOStream loStream) throws FrontendException {
loStream.resetSchema();
validate(loStream.getSchema());
}
/**
* Check if schema is valid (ready to be part of a final logical plan)
* @param schema
* @throws PlanValidationException if the if any field in schema has uid -1
* or (skipDuplicateUidCheck is true and there are duplicate uids in schema)
*/
public void validate(LogicalSchema schema)
throws PlanValidationException{
if(schema == null)
return;
Set<Long> uidsSeen = new HashSet<Long>();
for(LogicalFieldSchema fs : schema.getFields()){
if(!skipDuplicateUidCheck){
//check duplicate uid
if(!uidsSeen.add(fs.uid)){
// uid already seen
String msg = "Logical plan invalid state: duplicate uid in " +
"schema : " + schema;
throw new PlanValidationException(
msg,
2270,
PigException.BUG
);
}
}
if(fs.uid < 0){
String msg = "Logical plan invalid state: invalid uid " + fs.uid +
" in schema : " + schema;
throw new PlanValidationException(
msg,
2271,
PigException.BUG
);
}
}
}
}
class FieldSchemaResetter extends AllSameExpressionVisitor {
protected FieldSchemaResetter(OperatorPlan p) throws FrontendException {
super(p, new ReverseDependencyOrderWalker(p));
}
@Override
protected void execute(LogicalExpression op) throws FrontendException {
op.resetFieldSchema();
op.getFieldSchema();
}
}