blob: 87baf4a02074babda3ef21b9bd34d90b22c6d8b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
import java.io.PrintStream;
import java.util.LinkedList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.pig.impl.plan.DotPlanDumper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.DotPOPrinter;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
/**
* This class can print Spark plan in the DOT format. It uses
* clusters to illustrate nesting. If "verbose" is off, it will skip
* any nesting in the associated physical plans.
*/
public class DotSparkPrinter extends DotPlanDumper<SparkOperator, SparkOperPlan,
DotSparkPrinter.InnerOperator,
DotSparkPrinter.InnerPlan> {
private static int counter = 0;
private boolean isVerboseNesting = true;
public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) {
this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),
new HashSet<Operator>());
}
private DotSparkPrinter(SparkOperPlan plan, PrintStream ps, boolean isSubGraph,
Set<Operator> subgraphs,
Set<Operator> multiInputSubgraphs,
Set<Operator> multiOutputSubgraphs) {
super(plan, ps, isSubGraph, subgraphs,
multiInputSubgraphs, multiOutputSubgraphs);
}
@Override
public void setVerbose(boolean verbose) {
// leave the parents verbose set to true
isVerboseNesting = verbose;
}
@Override
protected DotPlanDumper makeDumper(InnerPlan plan, PrintStream ps) {
return new InnerPrinter(plan, ps, mSubgraphs, mMultiInputSubgraphs,
mMultiOutputSubgraphs);
}
@Override
protected String getName(SparkOperator op) {
String name = op.name();
// Cut of the part of the name specifying scope.
String delimiter = " - ";
String[] temp;
temp = name.split(delimiter);
return temp[0];
}
@Override
protected Collection<InnerPlan> getNestedPlans(SparkOperator op) {
Collection<InnerPlan> plans = new LinkedList<InnerPlan>();
plans.add(new InnerPlan(op.physicalPlan));
return plans;
}
@Override
protected String[] getAttributes(SparkOperator op) {
String[] attributes = new String[3];
attributes[0] = "label=\""+getName(op)+"\"";
attributes[1] = "style=\"filled\"";
attributes[2] = "fillcolor=\"#EEEEEE\"";
return attributes;
}
/**
* Helper class to represent the relationship of inner operators
*/
public static class InnerOperator extends Operator<PlanVisitor> {
private static final long serialVersionUID = 1L;
String name;
PhysicalPlan plan;
int code;
public InnerOperator(PhysicalPlan plan, String name) {
super(new OperatorKey());
this.name = name;
this.plan = plan;
this.code = counter++;
}
@Override public void visit(PlanVisitor v) {}
@Override public boolean supportsMultipleInputs() {return false;}
@Override public boolean supportsMultipleOutputs() {return false;}
@Override public String name() {return name;}
public PhysicalPlan getPlan() {return plan;}
@Override public int hashCode() {return code;}
}
/**
* Each spark operator will have and an inner plan of inner
* operators. The inner operators contain the physical plan of the
* execution phase.
*/
public static class InnerPlan extends OperatorPlan<InnerOperator> {
private static final long serialVersionUID = 1L;
public InnerPlan(PhysicalPlan plan) {
InnerOperator sparkInnerOp = new InnerOperator(plan, "spark");
this.add(sparkInnerOp);
}
}
private class InnerPrinter extends DotPlanDumper<InnerOperator, InnerPlan,
PhysicalOperator, PhysicalPlan> {
public InnerPrinter(InnerPlan plan, PrintStream ps,
Set<Operator> subgraphs,
Set<Operator> multiInputSubgraphs,
Set<Operator> multiOutputSubgraphs) {
super(plan, ps, true, subgraphs, multiInputSubgraphs,
multiOutputSubgraphs);
}
@Override
protected String[] getAttributes(InnerOperator op) {
String[] attributes = new String[3];
attributes[0] = "label=\""+super.getName(op)+"\"";
attributes[1] = "style=\"filled\"";
attributes[2] = "fillcolor=\"white\"";
return attributes;
}
@Override
protected Collection<PhysicalPlan> getNestedPlans(InnerOperator op) {
Collection<PhysicalPlan> l = new LinkedList<PhysicalPlan>();
l.add(op.getPlan());
return l;
}
@Override
protected DotPOPrinter makeDumper(PhysicalPlan plan, PrintStream ps) {
DotPOPrinter printer = new DotPOPrinter(plan, ps, true,
mSubgraphs,
mMultiInputSubgraphs,
mMultiOutputSubgraphs);
printer.setVerbose(isVerboseNesting);
return printer;
}
}
}