blob: 16a2e27b6df0ff4ac0fd1dd1d4adaad77f434798 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import com.google.common.collect.HashBiMap;
/**
*
* The base class for all types of physical plans.
* This extends the Operator Plan.
*
*/
public class PhysicalPlan extends OperatorPlan<PhysicalOperator> implements Cloneable {
/**
*
*/
private static final long serialVersionUID = 1L;
// marker to indicate whether all input for this plan
// has been sent - this is currently used in POStream
// to know if all map() calls and reduce() calls are finished
// and that there is no more input expected and in POPartialAgg.
public boolean endOfAllInput = false;
private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
public PhysicalPlan() {
super();
}
public void attachInput(Tuple t){
List<PhysicalOperator> roots = getRoots();
for (PhysicalOperator operator : roots) {
operator.attachInput(t);
}
}
public void detachInput(){
for(PhysicalOperator op : getRoots())
op.detachInput();
}
/**
* Write a visual representation of the Physical Plan
* into the given output stream
* @param out : OutputStream to which the visual representation is written
*/
public void explain(OutputStream out) {
explain(out, true);
}
/**
* Write a visual representation of the Physical Plan
* into the given output stream
* @param out : OutputStream to which the visual representation is written
* @param verbose : Amount of information to print
*/
public void explain(OutputStream out, boolean verbose){
PlanPrinter<PhysicalOperator, PhysicalPlan> mpp = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
this);
mpp.setVerbose(verbose);
try {
mpp.print(out);
} catch (VisitorException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Write a visual representation of the Physical Plan
* into the given printstream
* @param ps : PrintStream to which the visual representation is written
* @param format : Format to print in
* @param verbose : Amount of information to print
*/
public void explain(PrintStream ps, String format, boolean verbose) {
if (format.equals("xml")) {
ps.println("<physicalPlan>XML Not Supported</physicalPlan>");
return;
}
ps.println("#-----------------------------------------------");
ps.println("# Physical Plan:");
ps.println("#-----------------------------------------------");
if (format.equals("text")) {
explain((OutputStream)ps, verbose);
ps.println("");
} else if (format.equals("dot")) {
DotPOPrinter pp = new DotPOPrinter(this, ps);
pp.setVerbose(verbose);
pp.dump();
}
ps.println("");
}
@Override
public void connect(PhysicalOperator from, PhysicalOperator to)
throws PlanException {
super.connect(from, to);
to.setInputs(getPredecessors(to));
}
@Override
public void remove(PhysicalOperator op) {
op.setInputs(null);
List<PhysicalOperator> sucs = getSuccessors(op);
if(sucs!=null && sucs.size()!=0){
for (PhysicalOperator suc : sucs) {
// successor could have multiple inputs
// for example = POUnion - remove op from
// its list of inputs - if after removal
// there are no other inputs, set successor's
// inputs to null
List<PhysicalOperator> succInputs = suc.getInputs();
succInputs.remove(op);
if(succInputs.size() == 0)
suc.setInputs(null);
else
suc.setInputs(succInputs);
}
}
super.remove(op);
}
/* (non-Javadoc)
* @see org.apache.pig.impl.plan.OperatorPlan#replace(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
*/
@Override
public void replace(PhysicalOperator oldNode, PhysicalOperator newNode)
throws PlanException {
List<PhysicalOperator> oldNodeSuccessors = getSuccessors(oldNode);
super.replace(oldNode, newNode);
if(oldNodeSuccessors != null) {
for (PhysicalOperator preds : oldNodeSuccessors) {
List<PhysicalOperator> inputs = preds.getInputs();
// now replace oldNode with newNode in
// the input list of oldNode's successors
for(int i = 0; i < inputs.size(); i++) {
if(inputs.get(i) == oldNode) {
inputs.set(i, newNode);
}
}
}
}
}
/* (non-Javadoc)
* @see org.apache.pig.impl.plan.OperatorPlan#add(org.apache.pig.impl.plan.Operator)
@Override
public void add(PhysicalOperator op) {
// attach this plan as the plan the operator is part of
//op.setParentPlan(this);
super.add(op);
}
*/
public boolean isEmpty() {
return (mOps.size() == 0);
}
@Override
public String toString() {
if(isEmpty())
return "Empty Plan!";
else{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
explain(baos, true);
return baos.toString();
}
}
@Override
public PhysicalPlan clone() throws CloneNotSupportedException {
PhysicalPlan clone = new PhysicalPlan();
// Get all the nodes in this plan, and clone them. As we make
// clones, create a map between clone and original. Then walk the
// connections in this plan and create equivalent connections in the
// clone.
Map<PhysicalOperator, PhysicalOperator> matches =
new HashMap<PhysicalOperator, PhysicalOperator>(mOps.size());
// Sorting just so that explain output (scope ids) is same in jdk7 and jdk8
List<PhysicalOperator> opsToClone = new ArrayList<PhysicalOperator>(mOps.keySet());
Collections.sort(opsToClone);
for (PhysicalOperator op : opsToClone) {
PhysicalOperator c = op.clone();
clone.add(c);
if (opmap != null)
opmap.put(op, c);
matches.put(op, c);
}
// Build the edges
for (PhysicalOperator op : mFromEdges.keySet()) {
PhysicalOperator cloneFrom = matches.get(op);
if (cloneFrom == null) {
String msg = "Unable to find clone for op " + op.name();
throw new CloneNotSupportedException(msg);
}
Collection<PhysicalOperator> toOps = mFromEdges.get(op);
for (PhysicalOperator toOp : toOps) {
PhysicalOperator cloneTo = matches.get(toOp);
if (cloneTo == null) {
String msg = "Unable to find clone for op " + toOp.name();
throw new CloneNotSupportedException(msg);
}
try {
clone.connect(cloneFrom, cloneTo);
} catch (PlanException pe) {
CloneNotSupportedException cnse = new CloneNotSupportedException();
cnse.initCause(pe);
throw cnse;
}
}
}
// Fix up all the inputs in the operators themselves.
for (PhysicalOperator op : mOps.keySet()) {
List<PhysicalOperator> inputs = op.getInputs();
if (inputs == null || inputs.size() == 0) continue;
List<PhysicalOperator> newInputs =
new ArrayList<PhysicalOperator>(inputs.size());
PhysicalOperator cloneOp = matches.get(op);
if (cloneOp == null) {
String msg = "Unable to find clone for op " + op.name();
throw new CloneNotSupportedException(msg);
}
for (PhysicalOperator iOp : inputs) {
PhysicalOperator cloneIOp = matches.get(iOp);
if (cloneIOp == null) {
String msg = "Unable to find clone for op " + iOp.name();
throw new CloneNotSupportedException(msg);
}
newInputs.add(cloneIOp);
}
cloneOp.setInputs(newInputs);
}
for (PhysicalOperator op : mOps.keySet()) {
if (op instanceof ComparisonOperator) {
ComparisonOperator orig = (ComparisonOperator)op;
ComparisonOperator cloneOp = (ComparisonOperator)matches.get(op);
cloneOp.setOperandType(orig.getOperandType());
}
if (op instanceof UnaryExpressionOperator) {
UnaryExpressionOperator orig = (UnaryExpressionOperator)op;
UnaryExpressionOperator cloneOp = (UnaryExpressionOperator)matches.get(op);
cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
} else if (op instanceof BinaryExpressionOperator) {
BinaryExpressionOperator orig = (BinaryExpressionOperator)op;
BinaryExpressionOperator cloneOp = (BinaryExpressionOperator)matches.get(op);
cloneOp.setRhs((ExpressionOperator)matches.get(orig.getRhs()));
cloneOp.setLhs((ExpressionOperator)matches.get(orig.getLhs()));
} else if (op instanceof POBinCond) {
POBinCond orig = (POBinCond)op;
POBinCond cloneOp = (POBinCond)matches.get(op);
cloneOp.setRhs((ExpressionOperator)matches.get(orig.getRhs()));
cloneOp.setLhs((ExpressionOperator)matches.get(orig.getLhs()));
cloneOp.setCond((ExpressionOperator)matches.get(orig.getCond()));
}
}
//Fix order of edges in mToEdges lists
Map<PhysicalOperator, PhysicalOperator> invertedMatches = HashBiMap.create(matches).inverse();
for (PhysicalOperator newOp : clone.mToEdges.keySet()) {
List<PhysicalOperator> newList = clone.mToEdges.get(newOp);
if (newList.size() > 1) {
List<PhysicalOperator> originalList = this.mToEdges.get(invertedMatches.get(newOp));
Collections.sort(newList, new EdgeOrderHelper(originalList,invertedMatches));
}
}
return clone;
}
public void setOpMap(MultiMap<PhysicalOperator, PhysicalOperator> opmap) {
this.opmap = opmap;
}
public void resetOpMap()
{
opmap = null;
}
private static class EdgeOrderHelper implements Comparator<PhysicalOperator> {
private final Map<PhysicalOperator, PhysicalOperator> invertedMatches;
private final List<PhysicalOperator> originalList;
public EdgeOrderHelper(List<PhysicalOperator> originalList, Map<PhysicalOperator, PhysicalOperator> invertedMatches) {
this.originalList = originalList;
this.invertedMatches = invertedMatches;
}
@Override
public int compare(PhysicalOperator o1, PhysicalOperator o2) {
return originalList.indexOf(invertedMatches.get(o1)) - originalList.indexOf(invertedMatches.get(o2));
}
}
}