| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.parser; |
| |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.net.MalformedURLException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.antlr.runtime.IntStream; |
| import org.antlr.runtime.RecognitionException; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.FuncSpec; |
| import org.apache.pig.LoadFunc; |
| import org.apache.pig.NonFSLoadFunc; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.StoreFuncInterface; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; |
| import org.apache.pig.builtin.Assert; |
| import org.apache.pig.builtin.CubeDimensions; |
| import org.apache.pig.builtin.InvokerGenerator; |
| import org.apache.pig.builtin.PigStorage; |
| import org.apache.pig.builtin.RANDOM; |
| import org.apache.pig.builtin.RollupDimensions; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.io.FileSpec; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.plan.NodeIdGenerator; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.impl.streaming.StreamingCommand; |
| import org.apache.pig.impl.streaming.StreamingCommand.Handle; |
| import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; |
| import org.apache.pig.impl.util.MultiMap; |
| import org.apache.pig.impl.util.StringUtils; |
| import org.apache.pig.newplan.Operator; |
| import org.apache.pig.newplan.OperatorPlan; |
| import org.apache.pig.newplan.logical.expression.BinCondExpression; |
| import org.apache.pig.newplan.logical.expression.ConstantExpression; |
| import org.apache.pig.newplan.logical.expression.IsNullExpression; |
| import org.apache.pig.newplan.logical.expression.LessThanExpression; |
| import org.apache.pig.newplan.logical.expression.LogicalExpression; |
| import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; |
| import org.apache.pig.newplan.logical.expression.NotExpression; |
| import org.apache.pig.newplan.logical.expression.OrExpression; |
| import org.apache.pig.newplan.logical.expression.ProjectExpression; |
| import org.apache.pig.newplan.logical.expression.UserFuncExpression; |
| import org.apache.pig.newplan.logical.optimizer.SchemaResetter; |
| import org.apache.pig.newplan.logical.relational.LOCogroup; |
| import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE; |
| import org.apache.pig.newplan.logical.relational.LOCross; |
| import org.apache.pig.newplan.logical.relational.LOCube; |
| import org.apache.pig.newplan.logical.relational.LODistinct; |
| import org.apache.pig.newplan.logical.relational.LOFilter; |
| import org.apache.pig.newplan.logical.relational.LOForEach; |
| import org.apache.pig.newplan.logical.relational.LOGenerate; |
| import org.apache.pig.newplan.logical.relational.LOInnerLoad; |
| import org.apache.pig.newplan.logical.relational.LOJoin; |
| import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE; |
| import org.apache.pig.newplan.logical.relational.LOLimit; |
| import org.apache.pig.newplan.logical.relational.LOLoad; |
| import org.apache.pig.newplan.logical.relational.LONative; |
| import org.apache.pig.newplan.logical.relational.LORank; |
| import org.apache.pig.newplan.logical.relational.LOSort; |
| import org.apache.pig.newplan.logical.relational.LOSplit; |
| import org.apache.pig.newplan.logical.relational.LOSplitOutput; |
| import org.apache.pig.newplan.logical.relational.LOStore; |
| import org.apache.pig.newplan.logical.relational.LOStream; |
| import org.apache.pig.newplan.logical.relational.LOUnion; |
| import org.apache.pig.newplan.logical.relational.LogicalPlan; |
| import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; |
| import org.apache.pig.newplan.logical.relational.LogicalSchema; |
| import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; |
| import org.apache.pig.newplan.logical.rules.OptimizerUtils; |
| import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander; |
| import org.apache.pig.newplan.logical.visitor.ProjectStarExpander; |
| import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor; |
| import org.apache.pig.validator.BlackAndWhitelistFilter; |
| import org.apache.pig.validator.PigCommandFilter; |
| |
| public class LogicalPlanBuilder { |
| |
| private LogicalPlan plan = new LogicalPlan(); |
| |
| private String lastRel = null; |
| |
| private Map<String, Operator> operators = new HashMap<String, Operator>() { |
| @Override |
| public Operator put(String k, Operator v) { |
| lastRel = k; |
| return super.put(k, v); |
| } |
| }; |
| |
| Map<String, String> fileNameMap; |
| |
| private PigContext pigContext = null; |
| private String scope = null; |
| private IntStream intStream; |
| private int storeIndex = 0; |
| private int loadIndex = 0; |
| |
| private final BlackAndWhitelistFilter filter; |
| |
| private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator(); |
| |
| public static long getNextId(String scope) { |
| return nodeIdGen.getNextNodeId( scope ); |
| } |
| |
| LogicalPlanBuilder(PigContext pigContext, String scope, Map<String, String> fileNameMap, |
| IntStream input) { |
| this.pigContext = pigContext; |
| this.scope = scope; |
| this.fileNameMap = fileNameMap; |
| this.intStream = input; |
| this.filter = new BlackAndWhitelistFilter(this.pigContext); |
| } |
| |
| LogicalPlanBuilder(IntStream input) throws ExecException { |
| pigContext = new PigContext( ExecType.LOCAL, new Properties() ); |
| pigContext.connect(); |
| this.scope = "test"; |
| this.fileNameMap = new HashMap<String, String>(); |
| this.intStream = input; |
| this.filter = new BlackAndWhitelistFilter(this.pigContext); |
| } |
| |
| Operator lookupOperator(String alias) { |
| return operators.get( alias ); |
| } |
| |
| FuncSpec lookupFunction(String alias) { |
| return pigContext.getFuncSpecFromAlias( alias ); |
| } |
| |
| StreamingCommand lookupCommand(String alias) { |
| return pigContext.getCommandForAlias( alias ); |
| } |
| |
| void defineCommand(String alias, StreamingCommand command) { |
| try { |
| filter.validate(PigCommandFilter.Command.DEFINE); |
| } catch (FrontendException e) { |
| throw new RuntimeException(e.getMessage()); |
| } |
| pigContext.registerStreamCmd( alias, command ); |
| } |
| |
| void defineFunction(String alias, FuncSpec fs) { |
| try { |
| filter.validate(PigCommandFilter.Command.DEFINE); |
| } catch (FrontendException e) { |
| throw new RuntimeException(e); |
| } |
| pigContext.registerFunction( alias, fs ); |
| } |
| |
| LogicalPlan getPlan() { |
| return plan; |
| } |
| |
| Map<String, Operator> getOperators() { |
| return operators; |
| } |
| |
| LOFilter createFilterOp() { |
| return new LOFilter( plan ); |
| } |
| |
| LOLimit createLimitOp() { |
| return new LOLimit( plan ); |
| } |
| |
| LOFilter createSampleOp() { |
| return new LOFilter( plan, true ); |
| } |
| |
| String buildFilterOp(SourceLocation loc, LOFilter op, String alias, |
| String inputAlias, LogicalExpressionPlan expr) |
| throws ParserValidationException { |
| |
| op.setFilterPlan( expr ); |
| alias = buildOp( loc, op, alias, inputAlias, null ); // it should actually return same alias |
| try { |
| (new ProjStarInUdfExpander(op.getPlan())).visit(op); |
| new SchemaResetter(op.getPlan(), true).visit(op); |
| } catch (FrontendException e) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| return alias; |
| } |
| |
| String buildDistinctOp(SourceLocation loc, String alias, String inputAlias, String partitioner) throws ParserValidationException { |
| LODistinct op = new LODistinct( plan ); |
| return buildOp( loc, op, alias, inputAlias, partitioner ); |
| } |
| |
| String buildLimitOp(SourceLocation loc, String alias, String inputAlias, long limit) throws ParserValidationException { |
| LOLimit op = new LOLimit( plan, limit ); |
| return buildOp( loc, op, alias, inputAlias, null ); |
| } |
| |
| String buildLimitOp(SourceLocation loc, LOLimit op, String alias, String inputAlias, LogicalExpressionPlan expr) throws ParserValidationException { |
| op.setLimitPlan(expr); |
| return buildOp(loc, op, alias, inputAlias, null); |
| } |
| |
| String buildSampleOp(SourceLocation loc, String alias, String inputAlias, double value, |
| SourceLocation valLoc) |
| throws ParserValidationException { |
| |
| LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); |
| // Generate a filter condition. |
| LogicalExpression konst = new ConstantExpression( filterPlan, value); |
| konst.setLocation( valLoc ); |
| UserFuncExpression udf = new UserFuncExpression( filterPlan, new FuncSpec( RANDOM.class.getName() ) ); |
| new LessThanExpression( filterPlan, udf, konst ); |
| LOFilter filter = new LOFilter( plan, true ); |
| return buildFilterOp( loc, filter, alias, inputAlias, filterPlan ); |
| } |
| |
| String buildSampleOp(SourceLocation loc, LOFilter filter, String alias, String inputAlias, |
| LogicalExpressionPlan samplePlan, LogicalExpression expr) |
| throws ParserValidationException { |
| |
| UserFuncExpression udf = new UserFuncExpression( samplePlan, new FuncSpec( RANDOM.class.getName() ) ); |
| new LessThanExpression( samplePlan, udf, expr ); |
| return buildFilterOp( loc, filter, alias, inputAlias, samplePlan ); |
| } |
| |
| String buildUnionOp(SourceLocation loc, String alias, List<String> inputAliases, boolean onSchema) throws ParserValidationException { |
| checkDuplicateAliases(inputAliases, loc, "UNION"); |
| LOUnion op = new LOUnion( plan, onSchema ); |
| return buildOp( loc, op, alias, inputAliases, null ); |
| } |
| |
| String buildSplitOp(SourceLocation loc, String inputAlias) throws ParserValidationException { |
| LOSplit op = new LOSplit( plan ); |
| return buildOp( loc, op, null, inputAlias, null ); |
| } |
| |
| LOSplitOutput createSplitOutputOp() { |
| return new LOSplitOutput( plan ); |
| } |
| |
| String buildSplitOutputOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias, |
| LogicalExpressionPlan filterPlan) throws ParserValidationException { |
| op.setFilterPlan( filterPlan ); |
| return buildOp ( loc, op, alias, inputAlias, null ); |
| } |
| |
| String buildSplitOtherwiseOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias, boolean allowNulls) |
| throws ParserValidationException, PlanGenerationFailureException { |
| LogicalExpressionPlan splitPlan = new LogicalExpressionPlan(); |
| Operator losplit = lookupOperator(inputAlias); |
| LogicalExpression currentExpr = null; |
| for (Operator losplitoutput : plan.getSuccessors(losplit)) { |
| // take all the LOSplitOutput and negate their filter plans |
| LogicalExpressionPlan fragment = ((LOSplitOutput) losplitoutput) |
| .getFilterPlan(); |
| try { |
| if (OptimizerUtils.planHasNonDeterministicUdf(fragment)) |
| throw new ParserValidationException( |
| intStream, loc, new FrontendException(op, |
| "Can not use Otherwise in Split with an expression containing a @Nondeterministic UDF", 1131)); |
| } catch (FrontendException e) { |
| e.printStackTrace(); |
| throw new PlanGenerationFailureException(intStream, loc, e); |
| } |
| LogicalExpression root = null; |
| try { |
| // get the root expression of the filter plan in LOSplitOutput and copy it |
| root = ((LogicalExpression) fragment.getSources().get(0)) |
| .deepCopy(splitPlan); |
| } catch (FrontendException e) { |
| e.printStackTrace(); |
| throw new PlanGenerationFailureException(intStream, loc, e); |
| } |
| if (root == null) |
| throw new PlanGenerationFailureException(intStream, loc, |
| new FrontendException(op, |
| "Could not retrieve LogicalExpression for LOSplitOutput " + losplitoutput, 2048)); |
| if (currentExpr == null) |
| currentExpr = root; |
| else |
| currentExpr = new OrExpression(splitPlan, currentExpr, root); |
| } |
| // using De Morgan's law (!A && !B) == !(A || B) |
| currentExpr = new NotExpression(splitPlan, currentExpr); |
| if (allowNulls) { |
| // add IsNull expression to the condition: i.e. (A || B) IS NULL OR !(A || B). |
| // this is needed to catch null values in otherwise branch. |
| LogicalExpression isNull = new IsNullExpression( splitPlan, currentExpr ); |
| currentExpr = new OrExpression(splitPlan, isNull, currentExpr); |
| } |
| try { |
| // Going through all the ProjectExpressions that were cloned |
| // and updating the attached operators from its original |
| // LOSplitOutput to to the "otherwise" LOSplitOutput |
| // (PIG-3641) |
| new ResetProjectionAttachedRelationalOpVisitor(splitPlan, op).visit(); |
| } catch (FrontendException e) { |
| e.printStackTrace(); |
| throw new PlanGenerationFailureException(intStream, loc, e); |
| } |
| |
| op.setFilterPlan(splitPlan); |
| return buildOp(loc, op, alias, inputAlias, null); |
| } |
| |
| String buildCrossOp(SourceLocation loc, String alias, List<String> inputAliases, String partitioner) throws ParserValidationException { |
| LOCross op = new LOCross( plan ); |
| return buildOp ( loc, op, alias, inputAliases, partitioner ); |
| } |
| |
| LOSort createSortOp() { |
| return new LOSort( plan ); |
| } |
| |
| String buildSortOp(SourceLocation loc, LOSort sort, String alias, String inputAlias, List<LogicalExpressionPlan> plans, |
| List<Boolean> ascFlags, FuncSpec fs) throws ParserValidationException { |
| sort.setSortColPlans( plans ); |
| sort.setUserFunc( fs ); |
| if (ascFlags.isEmpty()) { |
| for (int i=0;i<plans.size();i++) |
| ascFlags.add(true); |
| } |
| sort.setAscendingCols( ascFlags ); |
| alias = buildOp( loc, sort, alias, inputAlias, null ); |
| try { |
| (new ProjectStarExpander(sort.getPlan())).visit(sort); |
| (new ProjStarInUdfExpander(sort.getPlan())).visit(sort); |
| new SchemaResetter(sort.getPlan(), true).visit(sort); |
| } catch (FrontendException e ) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| return alias; |
| } |
| |
| LORank createRankOp() { |
| return new LORank( plan ); |
| } |
| |
| String buildRankOp(SourceLocation loc, LORank rank, String alias, String inputAlias, List<LogicalExpressionPlan> plans, |
| List<Boolean> ascFlags) throws ParserValidationException { |
| |
| //Rank |
| rank.setRankColPlan(plans); |
| if (ascFlags.isEmpty()) { |
| for (int i=0;i<plans.size();i++) |
| ascFlags.add(true); |
| } |
| rank.setAscendingCol(ascFlags); |
| |
| buildOp( loc, rank, alias, inputAlias, null ); |
| try { |
| (new ProjectStarExpander(rank.getPlan())).visit(rank); |
| (new ProjStarInUdfExpander(rank.getPlan())).visit(rank); |
| new SchemaResetter(rank.getPlan(), true).visit(rank); |
| } catch (FrontendException e ) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| |
| return alias; |
| } |
| |
| LOJoin createJoinOp() { |
| return new LOJoin( plan ); |
| } |
| |
| String buildJoinOp(SourceLocation loc, LOJoin op, String alias, List<String> inputAliases, |
| MultiMap<Integer, LogicalExpressionPlan> joinPlans, |
| JOINTYPE jt, List<Boolean> innerFlags, String partitioner) |
| throws ParserValidationException { |
| checkDuplicateAliases(inputAliases, loc, "JOIN"); |
| if (jt==null) |
| jt = JOINTYPE.HASH; |
| else { |
| op.pinOption(LOJoin.OPTION_JOIN); |
| } |
| |
| int inputCount = inputAliases.size(); |
| |
| if( jt == JOINTYPE.SKEWED ) { |
| if( partitioner != null ) { |
| throw new ParserValidationException( intStream, loc, |
| "Custom Partitioner is not supported for skewed join" ); |
| } |
| |
| if( inputCount != 2 ) { |
| throw new ParserValidationException( intStream, loc, |
| "Skewed join can only be applied for 2-way joins" ); |
| } |
| } else if( (jt == JOINTYPE.MERGE || jt == JOINTYPE.MERGESPARSE) && inputCount != 2 ) { |
| throw new ParserValidationException( intStream, loc, |
| "Merge join can only be applied for 2-way joins" ); |
| } else if( jt == JOINTYPE.REPLICATED ) { |
| if( innerFlags.size() == 2 && innerFlags.get( 0 ) == false ) { |
| throw new ParserValidationException( intStream, loc, |
| "Replicated join does not support (right|full) outer joins" ); |
| } |
| } |
| |
| boolean[] flags = new boolean[joinPlans.size()]; |
| if (innerFlags.size()!=0) { |
| for( int i = 0; i < joinPlans.size(); i++ ) { |
| flags[i] = innerFlags.get( i ); |
| } |
| } |
| else { |
| for( int i = 0; i < joinPlans.size(); i++ ) { |
| flags[i] = true; |
| } |
| } |
| op.setJoinType( jt ); |
| op.setInnerFlags( flags ); |
| op.setJoinPlans( joinPlans ); |
| alias = buildOp( loc, op, alias, inputAliases, partitioner ); |
| try { |
| (new ProjectStarExpander(op.getPlan())).visit(op); |
| (new ProjStarInUdfExpander(op.getPlan())).visit(op); |
| new SchemaResetter(op.getPlan(), true).visit(op); |
| } catch (FrontendException e ) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| return alias; |
| } |
| |
| LOCube createCubeOp() { |
| return new LOCube(plan); |
| } |
| |
| String buildCubeOp(SourceLocation loc, LOCube op, String alias, String inputAlias, |
| List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans) |
| throws ParserValidationException { |
| |
| // check if continuously occurring cube operations be combined |
| combineCubeOperations((ArrayList<String>) operations, expressionPlans); |
| |
| // set the expression plans for cube operator and build cube operator |
| op.setExpressionPlans(expressionPlans); |
| op.setOperations(operations); |
| buildOp(loc, op, alias, inputAlias, null); |
| try { |
| (new ProjectStarExpander(op.getPlan())).visit(op); |
| (new ProjStarInUdfExpander(op.getPlan())).visit(op); |
| new SchemaResetter(op.getPlan(), true).visit(op); |
| } catch (FrontendException e ) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| try { |
| alias = convertCubeToFGPlan(loc, op, inputAlias, operations, expressionPlans); |
| } catch (FrontendException e) { |
| throw new ParserValidationException(intStream, loc, e); |
| } |
| return alias; |
| } |
| |
| // if multiple CUBE operations occur continuously then it can be combined |
| // together CUBE rel BY CUBE(a,b), CUBE(c,d); => CUBE rel BY CUBE(a,b,c,d) |
| private void combineCubeOperations(ArrayList<String> operations, |
| MultiMap<Integer, LogicalExpressionPlan> expressionPlans) { |
| |
| int startIdx = -1; |
| int endIdx = -1; |
| int i = 0; |
| boolean isMerged = false; |
| |
| // scan and perform merge of column projections |
| for (i = 0; i < operations.size(); i++) { |
| if ((startIdx == -1) && (operations.get(i).equals("CUBE") == true)) { |
| startIdx = i; |
| } else { |
| if (operations.get(i).equals("CUBE") == true) { |
| endIdx = i; |
| } else { |
| if (endIdx > startIdx) { |
| mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx); |
| isMerged = true; |
| startIdx = -1; |
| endIdx = -1; |
| } else { |
| startIdx = -1; |
| endIdx = -1; |
| } |
| } |
| } |
| } |
| |
| // this check is required for the case when the sequence of CUBE |
| // operations occurs at the end, like (CUBE, ROLLUP, CUBE, CUBE) |
| // in which case endIdx will be greater than startIdx |
| if (endIdx > startIdx) { |
| isMerged = true; |
| mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx); |
| } |
| |
| // if merged then remove the column projections that were marked for |
| // deletion |
| if (isMerged) { |
| performDeletion(expressionPlans, operations); |
| } |
| } |
| |
| private void performDeletion(MultiMap<Integer, LogicalExpressionPlan> expressionPlans, |
| ArrayList<String> operations) { |
| |
| MultiMap<Integer, LogicalExpressionPlan> ep = new MultiMap<Integer, LogicalExpressionPlan>(); |
| List<String> op = new ArrayList<String>(); |
| int idx = 0; |
| // rearranging indices |
| for (int i = 0; i < operations.size(); i++) { |
| if (operations.get(i) != null) { |
| op.add(idx, operations.get(i)); |
| } |
| |
| if (expressionPlans.get(i) != null) { |
| ep.put(idx, expressionPlans.get(i)); |
| idx++; |
| } |
| } |
| |
| // performing deletions |
| operations.clear(); |
| operations.addAll(op); |
| |
| expressionPlans.clear(); |
| for (Integer i : ep.keySet()) { |
| expressionPlans.put(i, ep.get(i)); |
| } |
| } |
| |
| // performs merging of dimensions of merged cube operation |
| // Ex: CUBE(a,b), CUBE(c,d) ==> CUBE(a,b,c,d) |
| // in the above example CUBE operator and dimensions are merged |
| private void mergeAndMarkForDelete(ArrayList<String> operations, |
| MultiMap<Integer, LogicalExpressionPlan> expressionPlans, int startIdx, int endIdx) { |
| // mark for delete |
| for (int i = startIdx + 1; i <= endIdx; i++) { |
| expressionPlans.put(startIdx, expressionPlans.get(i)); |
| expressionPlans.removeKey(i); |
| operations.remove(i); |
| operations.add(i, null); |
| } |
| } |
| |
| // This function creates logical plan for foreach and groupby operators. |
| // It connects the predecessors of cube operator with foreach plan and |
| // disconnects cube operator from the logical plan. It also connects foreach |
| // plan with groupby plan. |
| private String convertCubeToFGPlan(SourceLocation loc, LOCube op, String inputAlias, |
| List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans) |
| throws FrontendException { |
| |
| LOForEach foreach = new LOForEach(plan); |
| LOCogroup groupby = new LOCogroup(plan); |
| LogicalPlan innerPlan = new LogicalPlan(); |
| LogicalRelationalOperator gen = new LOGenerate(innerPlan); |
| |
| injectForeachOperator(loc, op, foreach); |
| |
| // Get all column attributes from the input relation. |
| // Create ProjectExpression for all columns. Based on the |
| // dimensions specified by the user, specified columns will be attached |
| // to CubeDimension/RollupDimension UDF and rest will be pushed down |
| List<Operator> inpOpers = foreach.getPlan().getPredecessors(foreach); |
| List<LogicalExpressionPlan> allExprPlan = new ArrayList<LogicalExpressionPlan>(); |
| for (Operator oper : inpOpers) { |
| LogicalSchema schema = new LogicalSchema(); |
| schema = ((LogicalRelationalOperator) oper).getSchema(); |
| |
| if (schema != null) { |
| ArrayList<LogicalFieldSchema> fields = (ArrayList<LogicalFieldSchema>) schema |
| .getFields(); |
| for (int i = 0; i < fields.size(); i++) { |
| LogicalExpressionPlan lEplan = new LogicalExpressionPlan(); |
| new ProjectExpression(lEplan, i, fields.get(i).alias, null, gen); |
| allExprPlan.add(lEplan); |
| } |
| } |
| } |
| |
| // iterate over all operations and generate corresponding UDFs |
| for (int operIdx = 0; operIdx < operations.size(); operIdx++) { |
| List<LogicalExpressionPlan> lexpPlanList = new ArrayList<LogicalExpressionPlan>(); |
| List<LogicalExpression> lexpList = new ArrayList<LogicalExpression>(); |
| |
| lexpPlanList.addAll(expressionPlans.get(operIdx)); |
| |
| // If duplicates exists in the dimension list then exception is |
| // thrown |
| checkDuplicateProject(lexpPlanList); |
| |
| // Construct ProjectExpression from the LogicalExpressionPlans |
| lexpList = getProjectExpList(lexpPlanList, gen); |
| |
| for (int i = 0; i < lexpList.size(); i++) { |
| // Retain the columns that needs to be pushed down. |
| // Remove the dimension columns from the input column list |
| // as it will be attached to CubeDimension UDF |
| for (int j = 0; j < allExprPlan.size(); j++) { |
| LogicalExpression lexp = (LogicalExpression) allExprPlan.get(j).getSources() |
| .get(0); |
| String colAlias = ((ProjectExpression) lexpList.get(i)).getColAlias(); |
| if (colAlias == null) { |
| colAlias = ((ProjectExpression) lexpList.get(i)).getFieldSchema().alias; |
| } |
| |
| String projExpAlias = null; |
| try { |
| projExpAlias = ((ProjectExpression) lexp).getColAlias(); |
| } catch (ClassCastException e) { |
| // if it is not projection then it should be |
| // UserFuncExpr. |
| // ignore and continue till next ProjExpr is encountered |
| continue; |
| } |
| if (colAlias.equals(projExpAlias) == true) { |
| allExprPlan.remove(j); |
| } else { |
| // if projected exp alias is a namespaced alias |
| if (projExpAlias.lastIndexOf(":") != -1) { |
| projExpAlias = projExpAlias.substring( |
| projExpAlias.lastIndexOf(":") + 1, projExpAlias.length()); |
| if (colAlias.equals(projExpAlias) == true) { |
| allExprPlan.remove(j); |
| } |
| } |
| } |
| } |
| } |
| |
| // Create UDF with user specified dimensions |
| LogicalExpressionPlan uexpPlan = new LogicalExpressionPlan(); |
| if (operations.get(operIdx).equals("CUBE")) { |
| new UserFuncExpression(uexpPlan, new FuncSpec(CubeDimensions.class.getName()), |
| lexpList); |
| } else { |
| new UserFuncExpression(uexpPlan, new FuncSpec(RollupDimensions.class.getName()), |
| lexpList); |
| } |
| |
| for (LogicalExpressionPlan lexp : lexpPlanList) { |
| Iterator<Operator> it = lexp.getOperators(); |
| while (it.hasNext()) { |
| uexpPlan.add(it.next()); |
| } |
| } |
| // Add the UDF to logical expression plan that contains dependent |
| // attributes (pushed down from input columns) |
| allExprPlan.add(operIdx, uexpPlan); |
| } |
| |
| // If the operator is a UserFuncExpression then set the flatten flags. |
| List<Boolean> flattenFlags = new ArrayList<Boolean>(); |
| for (int idx = 0; idx < allExprPlan.size(); idx++) { |
| List<Operator> opers = allExprPlan.get(idx).getSources(); |
| for (Operator oper : opers) { |
| if (oper instanceof ProjectExpression) { |
| flattenFlags.add(false); |
| } else if (oper instanceof UserFuncExpression) { |
| flattenFlags.add(true); |
| } |
| } |
| } |
| |
| // Generate and Foreach operator creation |
| String falias = null; |
| try { |
| buildGenerateOp(loc, (LOForEach) foreach, (LOGenerate) gen, allExprPlan, |
| flattenFlags, getUserDefinedSchema(allExprPlan)); |
| falias = buildForeachOp(loc, (LOForEach) foreach, "cube", inputAlias, innerPlan); |
| } catch (ParserValidationException pve) { |
| throw new FrontendException(pve); |
| } |
| |
| List<Boolean> innerFlags = new ArrayList<Boolean>(); |
| List<String> inpAliases = new ArrayList<String>(); |
| inpAliases.add(falias); |
| innerFlags.add(false); |
| |
| // Get the output schema of foreach operator and reconstruct the |
| // LogicalExpressionPlan for each dimensional attributes |
| MultiMap<Integer, LogicalExpressionPlan> exprPlansCopy = new MultiMap<Integer, LogicalExpressionPlan>(); |
| |
| for (LogicalExpressionPlan exp : expressionPlans.values()) { |
| LogicalExpression lexp = (LogicalExpression) exp.getSources().get(0); |
| LogicalExpressionPlan epGrp = new LogicalExpressionPlan(); |
| new ProjectExpression(epGrp, 0, lexp.getFieldSchema().alias, null, groupby); |
| exprPlansCopy.put(0, epGrp); |
| } |
| |
| // build group by operator |
| try { |
| return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy, |
| GROUPTYPE.REGULAR, innerFlags, null); |
| } catch (ParserValidationException pve) { |
| throw new FrontendException(pve); |
| } |
| } |
| |
| // User defined schema for generate operator. If not specified output schema |
| // of UDF will be used which will prefix "dimensions" namespace to all fields |
| private List<LogicalSchema> getUserDefinedSchema(List<LogicalExpressionPlan> allExprPlan) |
| throws FrontendException { |
| |
| List<LogicalSchema> genOutputSchema = new ArrayList<LogicalSchema>(); |
| for (int i = 0; i < allExprPlan.size(); i++) { |
| List<Operator> opers = allExprPlan.get(i).getSources(); |
| for (Operator oper : opers) { |
| |
| // add a logical schema for dimensions that are pushed from |
| // predecessor of cube/rollup |
| if (oper instanceof ProjectExpression) { |
| LogicalSchema output = new LogicalSchema(); |
| output.addField(new LogicalFieldSchema( |
| ((ProjectExpression) oper).getColAlias(), null, DataType.NULL)); |
| genOutputSchema.add(output); |
| } else if (oper instanceof UserFuncExpression) { |
| // add logical schema for dimensions specified in |
| // cube/rollup operator |
| LogicalSchema output = new LogicalSchema(); |
| for (Operator op : ((UserFuncExpression) oper).getPlan().getSinks()) { |
| output.addField(new LogicalFieldSchema(((ProjectExpression) op) |
| .getFieldSchema())); |
| } |
| genOutputSchema.add(output); |
| } |
| |
| } |
| } |
| return genOutputSchema; |
| } |
| |
| private List<LogicalExpression> getProjectExpList(List<LogicalExpressionPlan> lexpPlanList, |
| LogicalRelationalOperator lro) throws FrontendException { |
| |
| List<LogicalExpression> leList = new ArrayList<LogicalExpression>(); |
| for (int i = 0; i < lexpPlanList.size(); i++) { |
| LogicalExpressionPlan lexp = lexpPlanList.get(i); |
| LogicalExpression lex = (LogicalExpression) lexp.getSources().get(0); |
| Iterator<Operator> opers = lexp.getOperators(); |
| |
| // ProjExpr are initially attached to CubeOp. So re-attach it to |
| // specified operator |
| while (opers.hasNext()) { |
| Operator oper = opers.next(); |
| try { |
| ((ProjectExpression) oper).setAttachedRelationalOp(lro); |
| } catch (ClassCastException cce) { |
| throw new FrontendException("Column project expected.", cce); |
| } |
| } |
| |
| leList.add(lex); |
| } |
| |
| return leList; |
| } |
| |
| // This method connects the predecessors of cube operator with foreach |
| // operator and disconnects the cube operator from its predecessors |
| private void injectForeachOperator(SourceLocation loc, LOCube op, LOForEach foreach) |
| throws FrontendException { |
| // connect the foreach operator with predecessors of cube operator |
| List<Operator> opers = op.getPlan().getPredecessors(op); |
| for (Operator oper : opers) { |
| OperatorPlan foreachPlan = foreach.getPlan(); |
| foreachPlan.connect(oper, (Operator) foreach); |
| } |
| |
| // disconnect the cube operator from the plan |
| opers = foreach.getPlan().getPredecessors(foreach); |
| for (Operator lop : opers) { |
| List<Operator> succs = lop.getPlan().getSuccessors(lop); |
| for (Operator succ : succs) { |
| if (succ instanceof LOCube) { |
| succ.getPlan().disconnect(lop, succ); |
| succ.getPlan().remove(succ); |
| } |
| } |
| } |
| } |
| |
| // This methods if the dimensions specified by the user has duplicates |
| private void checkDuplicateProject(List<LogicalExpressionPlan> lExprPlan) |
| throws FrontendException { |
| |
| for (int i = 0; i < lExprPlan.size(); i++) { |
| for (int j = i + 1; j < lExprPlan.size(); j++) { |
| LogicalExpression outer = (LogicalExpression) lExprPlan.get(i).getSources().get(0); |
| LogicalExpression inner = (LogicalExpression) lExprPlan.get(j).getSources().get(0); |
| String outColAlias = ((ProjectExpression) outer).getColAlias(); |
| String inColAlias = ((ProjectExpression) inner).getColAlias(); |
| |
| if (outColAlias == null) { |
| outColAlias = outer.getFieldSchema().alias; |
| } |
| |
| if (inColAlias == null) { |
| inColAlias = inner.getFieldSchema().alias; |
| } |
| |
| if (outColAlias.equals(inColAlias) == true) { |
| lExprPlan.remove(j); |
| throw new FrontendException("Duplicate dimensions detected. Dimension name: " |
| + inColAlias); |
| } |
| } |
| } |
| } |
| |
| LOCogroup createGroupOp() { |
| return new LOCogroup( plan ); |
| } |
| |
| String buildGroupOp(SourceLocation loc, LOCogroup op, String alias, List<String> inputAliases, |
| MultiMap<Integer, LogicalExpressionPlan> expressionPlans, GROUPTYPE gt, List<Boolean> innerFlags, |
| String partitioner) throws ParserValidationException { |
| if( gt == GROUPTYPE.COLLECTED ) { |
| if( inputAliases.size() > 1 ) { |
| throw new ParserValidationException( intStream, loc, |
| "Collected group is only supported for single input" ); |
| } |
| |
| List<LogicalExpressionPlan> exprPlans = expressionPlans.get( 0 ); |
| for( LogicalExpressionPlan exprPlan : exprPlans ) { |
| Iterator<Operator> it = exprPlan.getOperators(); |
| while( it.hasNext() ) { |
| if( !( it.next() instanceof ProjectExpression ) ) { |
| throw new ParserValidationException( intStream, loc, |
| "Collected group is only supported for columns or star projection" ); |
| } |
| } |
| } |
| } |
| |
| boolean[] flags = new boolean[innerFlags.size()]; |
| for( int i = 0; i < innerFlags.size(); i++ ) { |
| flags[i] = innerFlags.get( i ); |
| } |
| op.setExpressionPlans( expressionPlans ); |
| op.setGroupType( gt ); |
| op.setInnerFlags( flags ); |
| alias = buildOp( loc, op, alias, inputAliases, partitioner ); |
| try { |
| (new ProjectStarExpander(op.getPlan())).visit(op); |
| (new ProjStarInUdfExpander(op.getPlan())).visit(op); |
| new SchemaResetter(op.getPlan(), true).visit(op); |
| } catch (FrontendException e ) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| |
| return alias; |
| } |
| |
| String buildLoadOp(SourceLocation loc, String alias, String filename, FuncSpec funcSpec, LogicalSchema schema) |
| throws ParserValidationException { |
| String absolutePath; |
| LoadFunc loFunc; |
| try { |
| // Load LoadFunc class from default properties if funcSpec is null. Fallback on PigStorage if LoadFunc is not specified in properties. |
| funcSpec = funcSpec == null ? new FuncSpec(pigContext.getProperties().getProperty(PigConfiguration.PIG_DEFAULT_LOAD_FUNC, PigStorage.class.getName())) : funcSpec; |
| loFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec); |
| String fileNameKey = QueryParserUtils.constructFileNameSignature(filename, funcSpec) + "_" + (loadIndex++); |
| absolutePath = fileNameMap.get(fileNameKey); |
| if (absolutePath == null) { |
| absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) ); |
| |
| if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) { |
| QueryParserUtils.setHdfsServers( absolutePath, pigContext ); |
| } |
| fileNameMap.put( fileNameKey, absolutePath ); |
| } |
| } catch(Exception ex) { |
| throw new ParserValidationException( intStream, loc, ex ); |
| } |
| |
| FileSpec loader = new FileSpec(absolutePath, funcSpec); |
| LOLoad op = new LOLoad( |
| loader, |
| schema, |
| plan, |
| ConfigurationUtil.toConfiguration(pigContext.getProperties()), |
| loFunc, |
| alias + "_" + newOperatorKey()); |
| op.setTmpLoad(false); |
| |
| // Check if there's a store in the plan already that this load |
| // depends on. If so, add it as an input alias |
| List<String> inputAliases = new ArrayList<String>(); |
| |
| // Get list of stores. The stores are not all sinks in the plan |
| // if they've already got successors. |
| Iterator<Operator> itr = plan.getOperators(); |
| List<LOStore> stores = new ArrayList<LOStore>(); |
| while (itr.hasNext()) { |
| Operator lop = itr.next(); |
| if (lop instanceof LOStore) { |
| stores.add((LOStore)lop); |
| } |
| } |
| |
| for (LOStore store : stores) { |
| String ifile = op.getFileSpec().getFileName(); |
| String ofile = store.getFileSpec().getFileName(); |
| if (ofile.equals(ifile)) { |
| inputAliases.add( store.getAlias() ); |
| } |
| } |
| |
| return buildOp( loc, op, alias, inputAliases, null ); |
| } |
| |
| private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias, |
| String inputAlias, String partitioner) throws ParserValidationException { |
| List<String> inputAliases = new ArrayList<String>(); |
| if( inputAlias != null ) |
| inputAliases.add( inputAlias ); |
| return buildOp( loc, op, alias, inputAliases, partitioner ); |
| } |
| |
| private void checkDuplicateAliases(List<String> inputAliases, SourceLocation loc, |
| String opName) throws ParserValidationException { |
| //Keep the count of the number of times the same Alias is used |
| Map<Operator, Integer> inputAliasesMap = new HashMap<Operator, Integer>(); |
| for(String a : inputAliases) { |
| Operator pred = operators.get( a ); |
| if (pred == null) { |
| throw new ParserValidationException( intStream, loc, "Unrecognized alias " + a ); |
| } |
| if (inputAliasesMap.containsKey(pred)) { |
| throw new ParserValidationException( intStream, loc, |
| "Pig does not accept same alias as input for " + opName + |
| " operation : " + a ); |
| } |
| else { |
| inputAliasesMap.put(pred, 1); |
| } |
| } |
| } |
| |
| private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias, |
| List<String> inputAliases, String partitioner) throws ParserValidationException { |
| setAlias( op, alias ); |
| setPartitioner( op, partitioner ); |
| op.setLocation( loc ); |
| plan.add( op ); |
| for( String a : inputAliases ) { |
| Operator pred = operators.get( a ); |
| if (pred==null) { |
| throw new ParserValidationException( intStream, loc, "Unrecognized alias " + a ); |
| } |
| plan.connect( pred, op ); |
| } |
| operators.put( op.getAlias(), op ); |
| pigContext.setLastAlias(op.getAlias()); |
| return op.getAlias(); |
| } |
| |
| String buildStoreOp(SourceLocation loc, String alias, String inputAlias, String filename, FuncSpec funcSpec) |
| throws ParserValidationException { |
| try { |
| // Load StoreFunc class from default properties if funcSpec is null. Fallback on PigStorage if StoreFunc is not specified in properties. |
| funcSpec = funcSpec == null ? new FuncSpec(pigContext.getProperties().getProperty( |
| PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName())) : funcSpec; |
| StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(funcSpec); |
| String fileNameKey = inputAlias + "_" + (storeIndex++) ; |
| |
| String signature = inputAlias + "_" + newOperatorKey(); |
| stoFunc.setStoreFuncUDFContextSignature(signature); |
| |
| String absolutePath = fileNameMap.get(fileNameKey); |
| if (absolutePath == null) { |
| absolutePath = stoFunc.relToAbsPathForStoreLocation( |
| filename, |
| QueryParserUtils.getCurrentDir(pigContext)); |
| if (absolutePath!=null) { |
| QueryParserUtils.setHdfsServers(absolutePath, pigContext); |
| } |
| fileNameMap.put(fileNameKey, absolutePath); |
| } |
| FileSpec fileSpec = new FileSpec(absolutePath, funcSpec); |
| boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). |
| getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); |
| |
| LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled); |
| return buildOp(loc, op, alias, inputAlias, null); |
| } catch(Exception ex) { |
| throw new ParserValidationException(intStream, loc, ex); |
| } |
| } |
| |
| String buildAssertOp(SourceLocation loc, LOFilter filterOp, |
| String alias, String inputAlias, LogicalExpression expr, String comment, |
| LogicalExpressionPlan exprPlan) |
| throws ParserValidationException { |
| try { |
| filterOp.setAlias(inputAlias); |
| List<LogicalExpression> args = new ArrayList<LogicalExpression>(); |
| ConstantExpression lhs = new ConstantExpression(exprPlan, new Boolean(true)); |
| ConstantExpression rhs = new ConstantExpression(exprPlan, new Boolean(false)); |
| BinCondExpression binCond = new BinCondExpression(exprPlan, expr, lhs, rhs); |
| args.add(binCond); |
| ConstantExpression constExpr = new ConstantExpression(exprPlan, (comment == null ? "" : comment)); |
| args.add(constExpr); |
| UserFuncExpression udf = new UserFuncExpression(exprPlan, new FuncSpec( Assert.class.getName() ), args ); |
| exprPlan.add(udf); |
| filterOp.setFilterPlan(exprPlan); |
| // pass the inputAlias to alias |
| return buildFilterOp(loc, filterOp, inputAlias, inputAlias, exprPlan); |
| } catch (Exception ex) { |
| throw new ParserValidationException(intStream, loc, ex); |
| } |
| } |
| |
| private String newOperatorKey() { |
| return new OperatorKey( scope, getNextId() ).toString(); |
| } |
| |
| public static String newOperatorKey(String scope) { |
| return new OperatorKey( scope, getNextId(scope)).toString(); |
| } |
| |
| LOForEach createForeachOp() { |
| return new LOForEach( plan ); |
| } |
| |
| String buildForeachOp(SourceLocation loc, LOForEach op, String alias, String inputAlias, LogicalPlan innerPlan) |
| throws ParserValidationException { |
| op.setInnerPlan( innerPlan ); |
| alias = buildOp( loc, op, alias, inputAlias, null ); |
| try { |
| (new ProjectStarExpander(op.getPlan())).visit(op); |
| (new ProjStarInUdfExpander(op.getPlan())).visit(op); |
| new SchemaResetter(op.getPlan(), true).visit(op); |
| } catch (FrontendException e ) { |
| throw new ParserValidationException( intStream, loc, e ); |
| } |
| return alias; |
| } |
| |
| LOGenerate createGenerateOp(LogicalPlan plan) { |
| return new LOGenerate( plan ); |
| } |
| |
| void buildGenerateOp(SourceLocation loc, LOForEach foreach, LOGenerate gen, |
| List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags, |
| List<LogicalSchema> schemas) |
| throws ParserValidationException { |
| |
| boolean[] flags = new boolean[ flattenFlags.size() ]; |
| for( int i = 0; i < flattenFlags.size(); i++ ) |
| flags[i] = flattenFlags.get( i ); |
| LogicalPlan innerPlan = (LogicalPlan)gen.getPlan(); |
| ArrayList<Operator> inputs = new ArrayList<Operator>(); |
| int idx = 0; |
| for( LogicalExpressionPlan exprPlan : exprPlans ) { |
| LogicalExpression expr = (LogicalExpression)exprPlan.getSources().get(0); |
| LogicalSchema userSchema = schemas.get(idx); |
| if (userSchema == null && expr.hasFieldSchema()) { |
| LogicalSchema ls = new LogicalSchema(); |
| try { |
| ls.addField(expr.getFieldSchema()); |
| schemas.set(idx, ls); |
| } catch (FrontendException e) { |
| // if we get an exception, then we have no schema to set |
| } |
| } |
| idx++; |
| try { |
| processExpressionPlan( foreach, innerPlan, exprPlan, inputs ); |
| } catch (FrontendException e) { |
| throw new ParserValidationException(intStream, loc, e); |
| } |
| } |
| |
| gen.setOutputPlans( exprPlans ); |
| gen.setFlattenFlags( flags ); |
| gen.setUserDefinedSchema( schemas ); |
| innerPlan.add( gen ); |
| gen.setLocation( loc ); |
| for( Operator input : inputs ) { |
| innerPlan.connect( input, gen ); |
| } |
| } |
| |
| /** |
| * Process expression plans of LOGenerate and set inputs relation |
| * for the ProjectExpression |
| * @param foreach |
| * @param lp Logical plan in which the LOGenerate is in |
| * @param plan One of the output expression of the LOGenerate |
| * @param inputs inputs of the LOGenerate |
| * @throws FrontendException |
| */ |
| private static void processExpressionPlan(LOForEach foreach, |
| LogicalPlan lp, |
| LogicalExpressionPlan plan, |
| ArrayList<Operator> inputs ) throws FrontendException { |
| Iterator<Operator> it = plan.getOperators(); |
| while( it.hasNext() ) { |
| Operator sink = it.next(); |
| //check all ProjectExpression |
| if( sink instanceof ProjectExpression ) { |
| ProjectExpression projExpr = (ProjectExpression)sink; |
| String colAlias = projExpr.getColAlias(); |
| if( projExpr.isRangeProject()){ |
| |
| LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach, |
| new ProjectExpression(projExpr, new LogicalExpressionPlan()) |
| ); |
| setupInnerLoadAndProj(innerLoad, projExpr, lp, inputs); |
| } else if( colAlias != null ) { |
| // the project is using a column alias |
| Operator op = projExpr.getProjectedOperator(); |
| if( op != null ) { |
| // this means the project expression refers to a relation |
| // in the nested foreach |
| |
| //add the relation to inputs of LOGenerate and set |
| // projection input |
| int index = inputs.indexOf( op ); |
| if( index == -1 ) { |
| index = inputs.size(); |
| inputs.add( op ); |
| } |
| projExpr.setInputNum( index ); |
| projExpr.setColNum( -1 ); |
| } else { |
| // this means the project expression refers to a column |
| // in the input of foreach. Add a LOInnerLoad and use that |
| // as input |
| LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach, colAlias ); |
| setupInnerLoadAndProj(innerLoad, projExpr, lp, inputs); |
| } |
| } else { |
| // the project expression is referring to column in ForEach input |
| // using position (eg $1) |
| LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach, projExpr.getColNum() ); |
| setupInnerLoadAndProj(innerLoad, projExpr, lp, inputs); |
| } |
| } |
| } |
| } |
| |
| private static void setupInnerLoadAndProj(LOInnerLoad innerLoad, |
| ProjectExpression projExpr, LogicalPlan lp, |
| ArrayList<Operator> inputs) { |
| innerLoad.setLocation( projExpr.getLocation() ); |
| projExpr.setInputNum( inputs.size() ); |
| projExpr.setColNum( -1 ); // Projection Expression on InnerLoad is always (*). |
| lp.add( innerLoad ); |
| inputs.add( innerLoad ); |
| |
| } |
| |
| Operator buildNestedOperatorInput(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach, |
| Map<String, Operator> operators, LogicalExpression expr) |
| throws NonProjectExpressionException, ParserValidationException { |
| OperatorPlan plan = expr.getPlan(); |
| Iterator<Operator> it = plan.getOperators(); |
| if( !( it.next() instanceof ProjectExpression ) || it.hasNext() ) { |
| throw new NonProjectExpressionException( intStream, loc, expr ); |
| } |
| Operator op = null; |
| ProjectExpression projExpr = (ProjectExpression)expr; |
| String colAlias = projExpr.getColAlias(); |
| if( colAlias != null ) { |
| op = operators.get( colAlias ); |
| if( op == null ) { |
| op = createInnerLoad(loc, innerPlan, foreach, colAlias ); |
| op.setLocation( projExpr.getLocation() ); |
| innerPlan.add( op ); |
| } |
| } else { |
| op = new LOInnerLoad( innerPlan, foreach, projExpr.getColNum() ); |
| op.setLocation( projExpr.getLocation() ); |
| innerPlan.add( op ); |
| } |
| return op; |
| } |
| |
| private LOInnerLoad createInnerLoad(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach, |
| String colAlias) throws ParserValidationException { |
| try { |
| return new LOInnerLoad( innerPlan, foreach, colAlias ); |
| } catch (FrontendException e) { |
| throw new ParserValidationException(intStream, loc, e); |
| } |
| } |
| |
| StreamingCommand buildCommand(SourceLocation loc, String cmd, List<String> shipPaths, List<String> cachePaths, |
| List<HandleSpec> inputHandleSpecs, List<HandleSpec> outputHandleSpecs, |
| String logDir, Integer limit) throws RecognitionException { |
| StreamingCommand command = null; |
| try { |
| command = buildCommand( loc, cmd ); |
| |
| // Process ship paths |
| if( shipPaths != null ) { |
| if( shipPaths.size() == 0 ) { |
| command.setShipFiles( false ); |
| } else { |
| for( String path : shipPaths ) |
| command.addPathToShip( path ); |
| } |
| } |
| |
| // Process cache paths |
| if( cachePaths != null ) { |
| for( String path : cachePaths ) |
| command.addPathToCache( path ); |
| } |
| |
| // Process input handle specs |
| if( inputHandleSpecs != null ) { |
| for( HandleSpec spec : inputHandleSpecs ) |
| command.addHandleSpec( Handle.INPUT, spec ); |
| } |
| |
| // Process output handle specs |
| if( outputHandleSpecs != null ) { |
| for( HandleSpec spec : outputHandleSpecs ) |
| command.addHandleSpec( Handle.OUTPUT, spec ); |
| } |
| |
| // error handling |
| if( logDir != null ) |
| command.setLogDir( logDir ); |
| if( limit != null ) |
| command.setLogFilesLimit( limit ); |
| } catch(IOException e) { |
| throw new PlanGenerationFailureException( intStream, loc, e ); |
| } |
| |
| return command; |
| } |
| |
| StreamingCommand buildCommand(SourceLocation loc, String cmd) throws RecognitionException { |
| try { |
| String[] args = StreamingCommandUtils.splitArgs( cmd ); |
| StreamingCommand command = new StreamingCommand( pigContext, args ); |
| StreamingCommandUtils validator = new StreamingCommandUtils( pigContext ); |
| validator.checkAutoShipSpecs( command, args ); |
| return command; |
| } catch (ParserException e) { |
| throw new InvalidCommandException( intStream, loc, cmd ); |
| } |
| } |
| |
| String buildStreamOp(SourceLocation loc, String alias, String inputAlias, StreamingCommand command, |
| LogicalSchema schema, IntStream input) |
| throws RecognitionException { |
| try { |
| LOStream op = new LOStream( plan, pigContext.createExecutableManager(), command, schema ); |
| return buildOp( loc, op, alias, inputAlias, null ); |
| } catch (ExecException ex) { |
| throw new PlanGenerationFailureException( input, loc, ex ); |
| } |
| } |
| |
| String buildNativeOp(SourceLocation loc, String inputJar, String cmd, |
| List<String> paths, String storeAlias, String loadAlias, IntStream input) |
| throws RecognitionException { |
| LONative op; |
| try { |
| op = new LONative( plan, inputJar, StreamingCommandUtils.splitArgs( cmd ) ); |
| pigContext.addJar( inputJar ); |
| for( String path : paths ) |
| pigContext.addJar( path ); |
| buildOp( loc, op, null, new ArrayList<String>(), null ); |
| ((LOStore)operators.get( storeAlias )).setTmpStore(true); |
| plan.connect( operators.get( storeAlias ), op ); |
| LOLoad load = (LOLoad)operators.get( loadAlias ); |
| plan.connect( op, load ); |
| return load.getAlias(); |
| } catch (ParserException e) { |
| throw new InvalidCommandException( input, loc, cmd ); |
| } catch (MalformedURLException e) { |
| throw new InvalidPathException( input, loc, e); |
| } |
| } |
| |
| void setAlias(LogicalRelationalOperator op, String alias) { |
| if( alias == null ) |
| alias = newOperatorKey(); |
| op.setAlias( alias ); |
| } |
| |
| void setParallel(LogicalRelationalOperator op, Integer parallel) { |
| if( parallel != null ) { |
| op.setRequestedParallelism( parallel ); |
| } |
| } |
| |
| static void setPartitioner(LogicalRelationalOperator op, String partitioner) { |
| if( partitioner != null ) |
| op.setCustomPartitioner( partitioner ); |
| } |
| |
| FuncSpec buildFuncSpec(SourceLocation loc, String funcName, List<String> args, byte ft) throws RecognitionException { |
| String[] argArray = new String[args.size()]; |
| FuncSpec funcSpec = new FuncSpec( funcName, args.size() == 0 ? null : args.toArray( argArray ) ); |
| validateFuncSpec( loc, funcSpec, ft ); |
| return funcSpec; |
| } |
| |
| private void validateFuncSpec(SourceLocation loc, FuncSpec funcSpec, byte ft) throws RecognitionException { |
| switch (ft) { |
| case FunctionType.COMPARISONFUNC: |
| case FunctionType.LOADFUNC: |
| case FunctionType.STOREFUNC: |
| case FunctionType.STREAMTOPIGFUNC: |
| case FunctionType.PIGTOSTREAMFUNC: |
| try{ |
| Class<?> func = PigContext.resolveClassName(funcSpec.getClassName()); |
| FunctionType.tryCasting(func, ft); |
| } catch(Exception ex){ |
| throw new ParserValidationException(intStream, loc, ex); |
| } |
| } |
| } |
| |
| static String unquote(String s) { |
| return StringUtils.unescapeInputString( s.substring(1, s.length() - 1 ) ); |
| } |
| |
| static int undollar(String s) { |
| return Integer.parseInt( s.substring( 1, s.length() ) ); |
| } |
| |
| /** |
| * Parse the long given as a string such as "34L". |
| */ |
| static long parseLong(String s) { |
| String num = s.substring( 0, s.length() - 1 ); |
| return Long.parseLong( num ); |
| } |
| |
| /** |
| * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object |
| */ |
| static BigInteger parseBigInteger(String s) { |
| String num = s.substring( 0, s.length() - 2 ); |
| return new BigInteger( num ); |
| } |
| |
| /** |
| * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object |
| */ |
| static BigDecimal parseBigDecimal(String s) { |
| String num = s.substring( 0, s.length() - 2 ); |
| return new BigDecimal( num ); |
| } |
| |
| static Tuple buildTuple(List<Object> objList) { |
| TupleFactory tf = TupleFactory.getInstance(); |
| return tf.newTuple( objList ); |
| } |
| |
| static DataBag createDataBag() { |
| BagFactory bagFactory = BagFactory.getInstance(); |
| return bagFactory.newDefaultBag(); |
| } |
| |
| /** |
| * Build a project expression in foreach inner plan. |
| * The only difference here is that the projection can be for an expression alias, for which |
| * we will return whatever the expression alias represents. |
| * @throws RecognitionException |
| */ |
| LogicalExpression buildProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator op, |
| Map<String, Operator> operators, Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col) |
| throws RecognitionException { |
| ProjectExpression result = null; |
| |
| if( colAlias != null ) { |
| LogicalExpressionPlan exprPlan = exprPlans.get( colAlias ); |
| if( exprPlan != null ) { |
| LogicalExpressionPlan planCopy = null; |
| try { |
| planCopy = exprPlan.deepCopy(); |
| plan.merge( planCopy ); |
| } catch (FrontendException ex) { |
| throw new PlanGenerationFailureException( intStream, loc, ex ); |
| } |
| // The projected alias is actually expression alias, so the projections in the represented |
| // expression doesn't have any operator associated with it. We need to set it when we |
| // substitute the expression alias with the its expression. |
| if( op != null ) { |
| Iterator<Operator> it = plan.getOperators(); |
| while( it.hasNext() ) { |
| Operator o = it.next(); |
| if( o instanceof ProjectExpression ) { |
| ProjectExpression projExpr = (ProjectExpression)o; |
| projExpr.setAttachedRelationalOp( op ); |
| } |
| } |
| } |
| LogicalExpression root = (LogicalExpression)planCopy.getSources().get( 0 );// get the root of the plan |
| LogicalFieldSchema schema; |
| try { |
| schema = root.getFieldSchema(); |
| if (schema.alias == null) { |
| schema.alias = colAlias; |
| } |
| } catch (FrontendException e) { |
| // Sometimes it can throw an exception. If it does, then there is no schema to get |
| } |
| return root; |
| } else { |
| result = new ProjectExpression( plan, 0, colAlias, operators.get( colAlias ), op ); |
| result.setLocation( loc ); |
| return result; |
| } |
| } |
| result = new ProjectExpression( plan, 0, col, op ); |
| result.setLocation( loc ); |
| return result; |
| } |
| |
| /** |
| * Build a project expression for a projection present in global plan (not in nested foreach plan). |
| * @throws ParserValidationException |
| */ |
| LogicalExpression buildProjectExpr(SourceLocation loc, |
| LogicalExpressionPlan plan, LogicalRelationalOperator relOp, |
| int input, String colAlias, int col) |
| throws ParserValidationException { |
| ProjectExpression result = null; |
| result = colAlias != null ? |
| new ProjectExpression( plan, input, colAlias, null, relOp ) : |
| new ProjectExpression( plan, input, col, relOp ); |
| result.setLocation( loc ); |
| return result; |
| } |
| |
| /** |
| * Build a project expression that projects a range of columns |
| * @param loc |
| * @param plan |
| * @param relOp |
| * @param input |
| * @param startExpr the first expression to be projected, null |
| * if everything from first is to be projected |
| * @param endExpr the last expression to be projected, null |
| * if everything to the end is to be projected |
| * @return project expression |
| * @throws ParserValidationException |
| */ |
| LogicalExpression buildRangeProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator relOp, |
| int input, LogicalExpression startExpr, LogicalExpression endExpr) |
| throws ParserValidationException { |
| |
| if(startExpr == null && endExpr == null){ |
| // should not reach here as the parser is enforcing this condition |
| String msg = "in range project (..) at least one of start or end " + |
| "has to be specified. Use project-star (*) instead."; |
| throw new ParserValidationException(intStream, loc, msg); |
| } |
| |
| ProjectExpression proj = new ProjectExpression(plan, input, relOp); |
| |
| //set first column to be projected |
| if(startExpr != null){ |
| checkRangeProjectExpr(loc, startExpr); |
| ProjectExpression startProj = (ProjectExpression)startExpr; |
| if(startProj.getColAlias() != null){ |
| try { |
| proj.setStartAlias(startProj.getColAlias()); |
| } catch (FrontendException e) { |
| throw new ParserValidationException(intStream, loc, e); |
| } |
| }else{ |
| proj.setStartCol(startProj.getColNum()); |
| } |
| }else{ |
| proj.setStartCol(0);//project from first column |
| } |
| |
| //set last column to be projected |
| if(endExpr != null){ |
| checkRangeProjectExpr(loc, endExpr); |
| ProjectExpression endProj = (ProjectExpression)endExpr; |
| if(endProj.getColAlias() != null){ |
| try { |
| proj.setEndAlias(endProj.getColAlias()); |
| } catch (FrontendException e) { |
| throw new ParserValidationException(intStream, loc, e); |
| } |
| }else{ |
| proj.setEndCol(endProj.getColNum()); |
| } |
| }else{ |
| proj.setEndCol(-1); //project to last column |
| } |
| |
| try { |
| if(startExpr != null) |
| plan.removeAndReconnect(startExpr); |
| if(endExpr != null) |
| plan.removeAndReconnect(endExpr); |
| } catch (FrontendException e) { |
| throw new ParserValidationException(intStream, loc, e); |
| } |
| |
| return proj; |
| } |
| |
| private void checkRangeProjectExpr(SourceLocation loc, LogicalExpression startExpr) |
| throws ParserValidationException { |
| if(! (startExpr instanceof ProjectExpression)){ |
| // should not reach here as the parser is enforcing this condition |
| String msg = "range project (..) can have only a simple column." + |
| " Found :" + startExpr; |
| throw new ParserValidationException(intStream, loc, msg); |
| } |
| |
| } |
| |
| LogicalExpression buildInvokerUDF(SourceLocation loc, LogicalExpressionPlan plan, String packageName, String funcName, boolean isStatic, List<LogicalExpression> args) throws RecognitionException { |
| LogicalExpression le = new UserFuncExpression(plan, new FuncSpec(InvokerGenerator.class.getName()), args, false, true, isStatic, packageName, funcName); |
| le.setLocation(loc); |
| return le; |
| } |
| |
| public static Class<?> typeToClass(Class<?> clazz) { |
| if (clazz == Integer.TYPE) { |
| return Integer.class; |
| } else if (clazz == Long.TYPE) { |
| return Long.class; |
| } else if (clazz == Float.TYPE) { |
| return Long.class; |
| } else if (clazz == Double.TYPE) { |
| return Long.class; |
| } else if (clazz == Boolean.TYPE) { |
| return Long.class; |
| } else if (clazz == Short.TYPE) { |
| return Short.class; |
| } else if (clazz == Byte.TYPE) { |
| return Byte.class; |
| } else if (clazz == Character.TYPE) { |
| return Character.class; |
| } else { |
| throw new RuntimeException("Was not given a primitive TYPE class: " + clazz); |
| } |
| } |
| |
| LogicalExpression buildUDF(SourceLocation loc, LogicalExpressionPlan plan, |
| String funcName, List<LogicalExpression> args) |
| throws RecognitionException { |
| |
| Class<?> func; |
| try { |
| func = pigContext.getClassForAlias(funcName); |
| FunctionType.tryCasting(func, FunctionType.EVALFUNC); |
| } catch (Exception e) { |
| throw new PlanGenerationFailureException(intStream, loc, e); |
| } |
| |
| FuncSpec funcSpec = pigContext.getFuncSpecFromAlias(funcName); |
| LogicalExpression le; |
| if( funcSpec == null ) { |
| funcName = func.getName(); |
| funcSpec = new FuncSpec(funcName); |
| //this point is only reached if there was no DEFINE statement for funcName |
| //in which case, we pass that information along |
| le = new UserFuncExpression(plan, funcSpec, args, false); |
| } else { |
| le = new UserFuncExpression(plan, funcSpec, args, true); |
| } |
| |
| le.setLocation(loc); |
| return le; |
| } |
| |
| private long getNextId() { |
| return getNextId(scope); |
| } |
| |
| static LOFilter createNestedFilterOp(LogicalPlan plan) { |
| return new LOFilter( plan ); |
| } |
| |
| static LOLimit createNestedLimitOp(LogicalPlan plan) { |
| return new LOLimit ( plan ); |
| } |
| |
| // Build operator for foreach inner plan. |
| Operator buildNestedFilterOp(SourceLocation loc, LOFilter op, LogicalPlan plan, String alias, |
| Operator inputOp, LogicalExpressionPlan expr) { |
| op.setFilterPlan( expr ); |
| buildNestedOp( loc, plan, op, alias, inputOp ); |
| return op; |
| } |
| |
| Operator buildNestedDistinctOp(SourceLocation loc, LogicalPlan plan, String alias, Operator inputOp) { |
| LODistinct op = new LODistinct( plan ); |
| buildNestedOp( loc, plan, op, alias, inputOp ); |
| return op; |
| } |
| |
| Operator buildNestedLimitOp(SourceLocation loc, LogicalPlan plan, String alias, Operator inputOp, long limit) { |
| LOLimit op = new LOLimit( plan, limit ); |
| buildNestedOp( loc, plan, op, alias, inputOp ); |
| return op; |
| } |
| |
| Operator buildNestedLimitOp(SourceLocation loc, LOLimit op, LogicalPlan plan, String alias, |
| Operator inputOp, LogicalExpressionPlan expr) { |
| op.setLimitPlan( expr ); |
| buildNestedOp( loc, plan, op, alias, inputOp ); |
| return op; |
| } |
| |
| Operator buildNestedCrossOp(SourceLocation loc, LogicalPlan plan, String alias, List<Operator> inputOpList) { |
| LOCross op = new LOCross( plan ); |
| op.setNested(true); |
| buildNestedOp( loc, plan, op, alias, inputOpList ); |
| return op; |
| } |
| |
| private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op, |
| String alias, Operator inputOp) { |
| op.setLocation( loc ); |
| setAlias( op, alias ); |
| plan.add( op ); |
| plan.connect( inputOp, op ); |
| } |
| |
| private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op, |
| String alias, List<Operator> inputOpList) { |
| op.setLocation( loc ); |
| setAlias( op, alias ); |
| plan.add( op ); |
| for (Operator inputOp : inputOpList) { |
| plan.connect( inputOp, op ); |
| } |
| } |
| |
| static LOSort createNestedSortOp(LogicalPlan plan) { |
| return new LOSort( plan ); |
| } |
| |
| /** |
| * For any UNKNOWN type in the schema fields, set the type to BYTEARRAY |
| * @param sch |
| */ |
| static void setBytearrayForNULLType(LogicalSchema sch){ |
| for(LogicalFieldSchema fs : sch.getFields()){ |
| if(fs.type == DataType.NULL){ |
| fs.type = DataType.BYTEARRAY; |
| } |
| if(fs.schema != null){ |
| setBytearrayForNULLType(fs.schema); |
| } |
| } |
| } |
| |
| static LOForEach createNestedForeachOp(LogicalPlan plan) { |
| return new LOForEach(plan); |
| } |
| |
| Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp, |
| List<LogicalExpressionPlan> plans, |
| List<Boolean> ascFlags, FuncSpec fs) { |
| op.setSortColPlans( plans ); |
| if (ascFlags.isEmpty()) { |
| for (int i=0;i<plans.size();i++) |
| ascFlags.add(true); |
| } |
| op.setAscendingCols( ascFlags ); |
| op.setUserFunc( fs ); |
| buildNestedOp( loc, plan, op, alias, inputOp ); |
| return op; |
| } |
| |
| Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias, |
| Operator inputOp, LogicalPlan innerPlan) |
| throws ParserValidationException { |
| op.setInnerPlan(innerPlan); |
| buildNestedOp(loc, plan, op, alias, inputOp); |
| return op; |
| } |
| |
| Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach, |
| Map<String, Operator> operators, |
| String alias, |
| ProjectExpression projExpr, |
| List<LogicalExpressionPlan> exprPlans) |
| throws ParserValidationException { |
| Operator input = null; |
| String colAlias = projExpr.getColAlias(); |
| if( colAlias != null ) { |
| // ProjExpr refers to a name, which can be an alias for another operator or col name. |
| Operator op = operators.get( colAlias ); |
| if( op != null ) { |
| // ProjExpr refers to an operator alias. |
| input = op ; |
| } else { |
| // Assuming that ProjExpr refers to a column by name. Create an LOInnerLoad |
| input = createInnerLoad( loc, innerPlan, foreach, colAlias ); |
| input.setLocation( projExpr.getLocation() ); |
| } |
| } else { |
| // ProjExpr refers to a column by number. |
| input = new LOInnerLoad( innerPlan, foreach, projExpr.getColNum() ); |
| input.setLocation( projExpr.getLocation() ); |
| } |
| |
| LogicalPlan lp = new LogicalPlan(); // f's inner plan |
| LOForEach f = new LOForEach( innerPlan ); |
| f.setInnerPlan( lp ); |
| f.setLocation( loc ); |
| LOGenerate gen = new LOGenerate( lp ); |
| boolean[] flatten = new boolean[exprPlans.size()]; |
| |
| List<Operator> innerLoads = new ArrayList<Operator>( exprPlans.size() ); |
| for( LogicalExpressionPlan plan : exprPlans ) { |
| ProjectExpression pe = (ProjectExpression)plan.getSinks().get( 0 ); |
| String al = pe.getColAlias(); |
| LOInnerLoad iload = ( al == null ) ? |
| new LOInnerLoad( lp, f, pe.getColNum() ) : createInnerLoad(loc, lp, f, al ); |
| iload.setLocation( pe.getLocation() ); |
| pe.setColNum( -1 ); |
| pe.setInputNum( innerLoads.size() ); |
| pe.setAttachedRelationalOp( gen ); |
| innerLoads.add( iload ); |
| } |
| |
| gen.setOutputPlans( exprPlans ); |
| gen.setFlattenFlags( flatten ); |
| lp.add( gen ); |
| |
| for( Operator il : innerLoads ) { |
| lp.add( il ); |
| lp.connect( il, gen ); |
| } |
| |
| // Connect the inner load operators to gen |
| setAlias( f, alias ); |
| innerPlan.add( input ); |
| innerPlan.add( f ); |
| innerPlan.connect( input, f ); |
| return f; |
| } |
| |
| GROUPTYPE parseGroupType(String hint, SourceLocation loc) throws ParserValidationException { |
| String modifier = unquote( hint ); |
| |
| if( modifier.equalsIgnoreCase( "collected" ) ) { |
| return GROUPTYPE.COLLECTED; |
| } else if( modifier.equalsIgnoreCase( "regular" ) ){ |
| return GROUPTYPE.REGULAR; |
| } else if( modifier.equalsIgnoreCase( "merge" ) ){ |
| return GROUPTYPE.MERGE; |
| } else { |
| throw new ParserValidationException( intStream, loc, |
| "Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers." ); |
| } |
| } |
| |
| JOINTYPE parseJoinType(String hint, SourceLocation loc) throws ParserValidationException { |
| String modifier = unquote( hint ); |
| |
| if( modifier.equalsIgnoreCase( "repl" ) || modifier.equalsIgnoreCase( "replicated" ) ) { |
| return JOINTYPE.REPLICATED; |
| } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) { |
| return LOJoin.JOINTYPE.HASH; |
| } else if( modifier.equalsIgnoreCase( "bloom" ) ) { |
| return LOJoin.JOINTYPE.BLOOM; |
| } else if( modifier.equalsIgnoreCase( "skewed" ) ) { |
| return JOINTYPE.SKEWED; |
| } else if (modifier.equalsIgnoreCase("merge")) { |
| return JOINTYPE.MERGE; |
| } else if (modifier.equalsIgnoreCase("merge-sparse")) { |
| return JOINTYPE.MERGESPARSE; |
| } else { |
| throw new ParserValidationException( intStream, loc, |
| "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." ); |
| } |
| } |
| |
| void putOperator(String alias, Operator op) { |
| operators.put(alias, op); |
| } |
| |
| public String getLastRel(SourceLocation loc) throws ParserValidationException { |
| if (lastRel == null) { |
| throw new ParserValidationException(intStream, loc, "Asked for last relation -- no relations have been defined"); |
| } |
| return lastRel; |
| } |
| |
| public String getLastRel() { |
| return lastRel; |
| } |
| |
| } |