| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.newplan.logical.relational; |
| |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.pig.PigConstants; |
| import org.apache.pig.PigException; |
| import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; |
| import org.apache.pig.data.SchemaTupleBackend; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.PigImplConstants; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.plan.CompilationMessageCollector; |
| import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType; |
| import org.apache.pig.impl.util.HashOutputStream; |
| import org.apache.pig.impl.util.ObjectSerializer; |
| import org.apache.pig.newplan.BaseOperatorPlan; |
| import org.apache.pig.newplan.Operator; |
| import org.apache.pig.newplan.OperatorPlan; |
| import org.apache.pig.newplan.logical.DotLOPrinter; |
| import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer; |
| import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter; |
| import org.apache.pig.newplan.logical.optimizer.SchemaResetter; |
| import org.apache.pig.newplan.logical.optimizer.UidResetter; |
| import org.apache.pig.newplan.logical.visitor.CastLineageSetter; |
| import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor; |
| import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover; |
| import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor; |
| import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor; |
| import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor; |
| import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor; |
| import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator; |
| import org.apache.pig.newplan.logical.visitor.ScalarVisitor; |
| import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor; |
| import org.apache.pig.newplan.logical.visitor.SortInfoSetter; |
| import org.apache.pig.newplan.logical.visitor.StoreAliasSetter; |
| import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor; |
| import org.apache.pig.newplan.logical.visitor.UnionOnSchemaSetter; |
| import org.apache.pig.pen.POOptimizeDisabler; |
| import org.apache.pig.validator.BlackAndWhitelistValidator; |
| |
| import com.google.common.base.Splitter; |
| import com.google.common.collect.Lists; |
| import com.google.common.hash.HashFunction; |
| import com.google.common.hash.Hashing; |
| |
| /** |
| * LogicalPlan is the logical view of relational operations Pig will execute |
| * for a given script. Note that it contains only relational operations. |
| * All expressions will be contained in LogicalExpressionPlans inside |
| * each relational operator. |
| */ |
| public class LogicalPlan extends BaseOperatorPlan { |
| |
| public LogicalPlan(LogicalPlan other) { |
| // shallow copy constructor |
| super(other); |
| } |
| |
| public LogicalPlan() { |
| super(); |
| } |
| |
| /** |
| * Equality is checked by calling equals on every leaf in the plan. This |
| * assumes that plans are always connected graphs. It is somewhat |
| * inefficient since every leaf will test equality all the way to |
| * every root. But it is only intended for use in testing, so that |
| * should be ok. Checking predecessors (as opposed to successors) was |
| * chosen because splits (which have multiple successors) do not depend |
| * on order of outputs for correctness, whereas joins (with multiple |
| * predecessors) do. That is, reversing the outputs of split in the |
| * graph has no correctness implications, whereas reversing the inputs |
| * of join can. This method of doing equals will detect predecessors |
| * in different orders but not successors in different orders. |
| * It will return false if either plan has non deterministic EvalFunc. |
| */ |
| @Override |
| public boolean isEqual(OperatorPlan other) throws FrontendException { |
| if (other == null || !(other instanceof LogicalPlan)) { |
| return false; |
| } |
| |
| return super.isEqual(other); |
| } |
| |
| @Override |
| public void explain(PrintStream ps, String format, boolean verbose) |
| throws FrontendException { |
| if (format.equals("xml")) { |
| ps.println("<logicalPlan>XML Not Supported</logicalPlan>"); |
| return; |
| } |
| |
| ps.println("#-----------------------------------------------"); |
| ps.println("# New Logical Plan:"); |
| ps.println("#-----------------------------------------------"); |
| |
| if (this.size() == 0) { |
| ps.println("Logical plan is empty."); |
| } else if (format.equals("dot")) { |
| DotLOPrinter lpp = new DotLOPrinter(this, ps); |
| lpp.dump(); |
| } else { |
| LogicalPlanPrinter npp = new LogicalPlanPrinter(this, ps); |
| npp.visit(); |
| } |
| } |
| |
| public Operator findByAlias(String alias) { |
| Iterator<Operator> it = getOperators(); |
| List<Operator> ops = new ArrayList<Operator>(); |
| while( it.hasNext() ) { |
| LogicalRelationalOperator op = (LogicalRelationalOperator) it.next(); |
| if(op.getAlias() == null) |
| continue; |
| if(op.getAlias().equals( alias ) ) { |
| ops.add( op ); |
| } |
| } |
| |
| if( ops.isEmpty() ) { |
| return null; |
| } else { |
| return ops.get( ops.size() - 1 ); // Last one |
| } |
| } |
| |
| /** |
| * Returns the signature of the LogicalPlan. The signature is a unique identifier for a given |
| * plan generated by a Pig script. The same script run multiple times with the same version of |
| * Pig is guaranteed to produce the same signature, even if the input or output locations differ. |
| * |
| * @return a unique identifier for the logical plan |
| * @throws FrontendException if signature can't be computed |
| */ |
| public String getSignature() throws FrontendException { |
| |
| // Use a streaming hash function. We use a murmur_32 function with a constant seed, 0. |
| HashFunction hf = Hashing.murmur3_32(0); |
| HashOutputStream hos = new HashOutputStream(hf); |
| PrintStream ps = new PrintStream(hos); |
| |
| LogicalPlanPrinter printer = new LogicalPlanPrinter(this, ps); |
| printer.visit(); |
| |
| return Integer.toString(hos.getHashCode().asInt()); |
| } |
| |
| public void validate(PigContext pigContext, String scope, boolean skipInputOutputValidation) |
| throws FrontendException { |
| |
| new DanglingNestedNodeRemover(this).visit(); |
| new ColumnAliasConversionVisitor(this).visit(); |
| new SchemaAliasVisitor(this).visit(); |
| new ScalarVisitor(this, pigContext, scope).visit(); |
| new ForEachUserSchemaVisitor(this).visit(); |
| |
| // ImplicitSplitInsertVisitor has to be called before |
| // DuplicateForEachColumnRewriteVisitor. Detail at pig-1766 |
| new ImplicitSplitInsertVisitor(this).visit(); |
| |
| // DuplicateForEachColumnRewriteVisitor should be before |
| // TypeCheckingRelVisitor which does resetSchema/getSchema |
| // heavily |
| new DuplicateForEachColumnRewriteVisitor(this).visit(); |
| |
| CompilationMessageCollector collector = new CompilationMessageCollector() ; |
| |
| new TypeCheckingRelVisitor( this, collector).visit(); |
| |
| |
| new UnionOnSchemaSetter(this).visit(); |
| new CastLineageSetter(this, collector).visit(); |
| new ScalarVariableValidator(this).visit(); |
| new StoreAliasSetter(this).visit(); |
| |
| // compute whether output data is sorted or not |
| new SortInfoSetter(this).visit(); |
| |
| boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); |
| |
| if(aggregateWarning) { |
| CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log); |
| } else { |
| for(Enum type: MessageType.values()) { |
| CompilationMessageCollector.logAllMessages(collector, log); |
| } |
| } |
| |
| if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) { |
| // Validate input/output file |
| new InputOutputFileValidatorVisitor(this, pigContext).visit(); |
| } |
| |
| BlackAndWhitelistValidator validator = new BlackAndWhitelistValidator(pigContext, this); |
| validator.validate(); |
| |
| // Now make sure the plan is consistent |
| UidResetter uidResetter = new UidResetter(this); |
| uidResetter.visit(); |
| |
| SchemaResetter schemaResetter = new SchemaResetter(this, |
| true /* skip duplicate uid check*/); |
| schemaResetter.visit(); |
| } |
| |
| public void optimize(PigContext pigContext) throws FrontendException { |
| if (pigContext.inIllustrator) { |
| // disable all PO-specific optimizations |
| POOptimizeDisabler pod = new POOptimizeDisabler(this); |
| pod.visit(); |
| } |
| |
| HashSet<String> disabledOptimizerRules; |
| try { |
| disabledOptimizerRules = (HashSet<String>) ObjectSerializer |
| .deserialize(pigContext.getProperties().getProperty( |
| PigImplConstants.PIG_OPTIMIZER_RULES_KEY)); |
| } catch (IOException ioe) { |
| int errCode = 2110; |
| String msg = "Unable to deserialize optimizer rules."; |
| throw new FrontendException(msg, errCode, PigException.BUG, ioe); |
| } |
| if (disabledOptimizerRules == null) { |
| disabledOptimizerRules = new HashSet<String>(); |
| } |
| |
| String pigOptimizerRulesDisabled = pigContext.getProperties() |
| .getProperty(PigConstants.PIG_OPTIMIZER_RULES_DISABLED_KEY); |
| if (pigOptimizerRulesDisabled != null) { |
| disabledOptimizerRules.addAll(Lists.newArrayList((Splitter.on(",") |
| .split(pigOptimizerRulesDisabled)))); |
| } |
| |
| if (pigContext.inIllustrator) { |
| disabledOptimizerRules.add("MergeForEach"); |
| disabledOptimizerRules.add("PartitionFilterOptimizer"); |
| disabledOptimizerRules.add("LimitOptimizer"); |
| disabledOptimizerRules.add("SplitFilter"); |
| disabledOptimizerRules.add("PushUpFilter"); |
| disabledOptimizerRules.add("MergeFilter"); |
| disabledOptimizerRules.add("PushDownForEachFlatten"); |
| disabledOptimizerRules.add("ColumnMapKeyPrune"); |
| disabledOptimizerRules.add("AddForEach"); |
| disabledOptimizerRules.add("GroupByConstParallelSetter"); |
| } |
| |
| try { |
| SchemaTupleBackend.initialize(ConfigurationUtil.toConfiguration(pigContext.getProperties(), true), |
| pigContext); |
| } catch (IOException e) { |
| throw new FrontendException(e); |
| } |
| // run optimizer |
| LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(this, 100, |
| disabledOptimizerRules, pigContext); |
| optimizer.optimize(); |
| } |
| } |