blob: 0b08be98f21791d6735987340a6c9609e406ddea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LOCube;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LONative;
import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import com.google.common.collect.Lists;
public class Util {
public static LogicalSchema translateSchema(Schema schema) {
if (schema == null) {
return null;
}
LogicalSchema s2 = new LogicalSchema();
List<Schema.FieldSchema> ll = schema.getFields();
for (Schema.FieldSchema f: ll) {
LogicalSchema.LogicalFieldSchema f2 =
new LogicalSchema.LogicalFieldSchema(f.alias, translateSchema(f.schema), f.type);
s2.addField(f2);
}
return s2;
}
public static LogicalSchema.LogicalFieldSchema translateFieldSchema(Schema.FieldSchema fs) {
LogicalSchema newSchema = null;
if (fs.schema!=null) {
newSchema = translateSchema(fs.schema);
}
LogicalSchema.LogicalFieldSchema newFs = new LogicalSchema.LogicalFieldSchema(fs.alias, newSchema, fs.type);
return newFs;
}
/**
* This function translates the new LogicalSchema into old Schema format required
* by PhysicalOperators
* @param schema LogicalSchema to be converted to Schema
* @return Schema that is converted from LogicalSchema
* @throws FrontendException
*/
public static Schema translateSchema(LogicalSchema schema) {
if (schema == null) {
return null;
}
Schema s2 = new Schema();
List<LogicalSchema.LogicalFieldSchema> ll = schema.getFields();
for (LogicalSchema.LogicalFieldSchema f: ll) {
Schema.FieldSchema f2 = null;
try {
f2 = new Schema.FieldSchema(f.alias, translateSchema(f.schema), f.type);
f2.canonicalName = ((Long)f.uid).toString();
s2.add(f2);
} catch (FrontendException e) {
}
}
return s2;
}
/**
* If schema argument has fields where a bag does not contain a tuple schema,
* it inserts a tuple schema. It does so for all inner levels.
* eg bag({int}) => bag({(int)})
* @param sch
* @return modified schema
* @throws FrontendException
*/
public static Schema fixSchemaAddTupleInBag(Schema sch) throws FrontendException{
LogicalSchema logSch = translateSchema(sch);
logSch.normalize();
return translateSchema(logSch);
}
public static Schema.FieldSchema translateFieldSchema(LogicalSchema.LogicalFieldSchema fs) {
if(fs == null)
return null;
Schema newSchema = null;
if (fs.schema!=null) {
newSchema = translateSchema(fs.schema);
}
Schema.FieldSchema newFs = null;
try {
newFs = new Schema.FieldSchema(null, newSchema, fs.type);
} catch (FrontendException e) {
}
return newFs;
}
public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op, int branch,
Set<Integer> columnsToDrop) throws FrontendException {
LOForEach foreach = new LOForEach(plan);
plan.add(foreach);
List<Operator> next = plan.getSuccessors(op);
if (next != null) {
LogicalRelationalOperator nextOp = (LogicalRelationalOperator)next.get(branch);
plan.insertBetween(op, foreach, nextOp);
foreach.setAlias(op.getAlias());
}
else {
plan.connect(op, foreach);
}
LogicalPlan innerPlan = new LogicalPlan();
foreach.setInnerPlan(innerPlan);
LogicalSchema schema = op.getSchema();
// build foreach inner plan
List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[schema.size()-columnsToDrop.size()]);
innerPlan.add(gen);
for (int i=0, j=0; i<schema.size(); i++) {
if (columnsToDrop.contains(i)) {
continue;
}
LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
innerPlan.add(innerLoad);
innerPlan.connect(innerLoad, gen);
LogicalExpressionPlan exp = new LogicalExpressionPlan();
ProjectExpression prj = new ProjectExpression(exp, j++, -1, gen);
exp.add(prj);
exps.add(exp);
}
return foreach;
}
/**
* Returns a LinkedList of operators contained within the physical plan
* which implement the supplied class, in dependency order. Returns an empty
* LinkedList of no such operators exist.
* @param plan
* @param opClass
* @return a LinkedList of operators contained within the plan which
* implement the supplied class; empty if no such ops exist.
* @throws FrontendException
*/
public static <C extends LogicalRelationalOperator> LinkedList<C> getLogicalRelationalOperators(LogicalPlan plan,
Class<C> opClass) throws FrontendException {
OpFinder<C> finder = new OpFinder<C>(plan, opClass);
finder.visit();
return finder.getFoundOps();
}
private static class OpFinder<C extends LogicalRelationalOperator> extends LogicalRelationalNodesVisitor {
final Class<C> opClass;
private LinkedList<C> foundOps = Lists.newLinkedList();
public OpFinder(LogicalPlan plan, Class<C> opClass) throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
this.opClass = opClass;
}
public LinkedList<C> getFoundOps() {
return foundOps;
}
@SuppressWarnings("unchecked")
private void visitOp(LogicalRelationalOperator op) {
if (opClass.isAssignableFrom(op.getClass())) {
foundOps.add((C) op);
}
}
public void visit(LOLoad load) throws FrontendException {
visitOp(load);
}
@Override
public void visit(LOFilter filter) throws FrontendException {
visitOp(filter);
}
@Override
public void visit(LOStore store) throws FrontendException {
visitOp(store);
}
@Override
public void visit(LOJoin join) throws FrontendException {
visitOp(join);
}
@Override
public void visit(LOForEach foreach) throws FrontendException {
visitOp(foreach);
}
@Override
public void visit(LOGenerate gen) throws FrontendException {
visitOp(gen);
}
@Override
public void visit(LOInnerLoad load) throws FrontendException {
visitOp(load);
}
@Override
public void visit(LOCube cube) throws FrontendException {
visitOp(cube);
}
@Override
public void visit(LOCogroup loCogroup) throws FrontendException {
visitOp(loCogroup);
}
@Override
public void visit(LOSplit loSplit) throws FrontendException {
visitOp(loSplit);
}
@Override
public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
visitOp(loSplitOutput);
}
@Override
public void visit(LOUnion loUnion) throws FrontendException {
visitOp(loUnion);
}
@Override
public void visit(LOSort loSort) throws FrontendException {
visitOp(loSort);
}
@Override
public void visit(LORank loRank) throws FrontendException{
visitOp(loRank);
}
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
visitOp(loDistinct);
}
@Override
public void visit(LOLimit loLimit) throws FrontendException {
visitOp(loLimit);
}
@Override
public void visit(LOCross loCross) throws FrontendException {
visitOp(loCross);
}
@Override
public void visit(LOStream loStream) throws FrontendException {
visitOp(loStream);
}
@Override
public void visit(LONative nativeMR) throws FrontendException{
visitOp(nativeMR);
}
}
}