| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.impl.plan; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.pig.PigException; |
| import org.apache.pig.impl.util.MultiMap; |
| |
| |
| //import org.apache.commons.collections.map.MultiValueMap; |
| |
| /** |
| * A generic graphing class for use by LogicalPlan, PhysicalPlan, etc. One |
| * important aspect of this package is that it guarantees that once a graph is |
| * constructed, manipulations on that graph will maintain the ordering of |
| * inputs and outputs for a given node. That is, if a node has two inputs, 0 |
| * and 1, it is guaranteed that everytime it asks for its inputs, it will |
| * receive them in the same order. This allows operators that need to |
| * distinguish their inputs (such as binary operators that need to know left |
| * from right) to work without needing to store their inputs themselves. This |
| * is an extra burden on the graph package and not in line with the way graphs |
| * are generally understood mathematically. But it greatly reducing the need |
| * for graph manipulators (such as the validators and optimizers) to |
| * understand the internals of various nodes. |
| */ |
| |
| //TODO |
| /* |
| * The graph operations swap, insertBetween, pushBefore, etc. have to be re-implemented |
| * in a layered fashion. The layering will facilitate the re-use of operations. In addition, |
| * use of operator.rewire in the aforementioned operations requires transaction like ability |
| * due to various pre-conditions. Often, the result of one of the operations leaves the |
| * graph in an inconsistent state for the rewire operation. Clear layering and assignment |
| * of the ability to rewire will remove these inconsistencies. For now, use of rewire |
| * has resulted in a slightly less maintainable code along with the necessity to use |
| * rewire with discretion. |
| */ |
| |
| // Suppress "unchecked" warnings for all logical plan related classes. Will revisit in logical plan rework |
| @SuppressWarnings("unchecked") |
| public abstract class OperatorPlan<E extends Operator> implements Iterable<E>, Serializable, Cloneable { |
| private static final long serialVersionUID = 1L; |
| protected Map<E, OperatorKey> mOps; |
| protected Map<OperatorKey, E> mKeys; |
| protected MultiMap<E, E> mFromEdges; |
| protected MultiMap<E, E> mToEdges; |
| protected MultiMap<E, E> mSoftFromEdges; |
| protected MultiMap<E, E> mSoftToEdges; |
| |
| private List<E> mRoots; |
| private List<E> mLeaves; |
| protected static final Log log = LogFactory.getLog(OperatorPlan.class); |
| |
| public OperatorPlan() { |
| mRoots = new ArrayList<E>(); |
| mLeaves = new ArrayList<E>(); |
| mOps = new HashMap<E, OperatorKey>(); |
| mKeys = new HashMap<OperatorKey, E>(); |
| mFromEdges = new MultiMap<E, E>(); |
| mToEdges = new MultiMap<E, E>(); |
| mSoftFromEdges = new MultiMap<E, E>(); |
| mSoftToEdges = new MultiMap<E, E>(); |
| } |
| |
| /** |
| * Get a list of all nodes in the graph that are roots. A root is defined to |
| * be a node that has no input. |
| */ |
| public List<E> getRoots() { |
| if (mRoots.size() == 0 && mOps.size() > 0) { |
| for (E op : mOps.keySet()) { |
| if (mToEdges.get(op) == null) { |
| mRoots.add(op); |
| } |
| } |
| } |
| return mRoots; |
| } |
| |
| /** |
| * Get a list of all nodes in the graph that are leaves. A leaf is defined to |
| * be a node that has no output. |
| */ |
| public List<E> getLeaves() { |
| if (mLeaves.size() == 0 && mOps.size() > 0) { |
| for (E op : mOps.keySet()) { |
| if (mFromEdges.get(op) == null) { |
| mLeaves.add(op); |
| } |
| } |
| } |
| return mLeaves; |
| } |
| |
| /** |
| * Given an operator, find its OperatorKey. |
| * @param op Logical operator. |
| * @return associated OperatorKey |
| */ |
| public OperatorKey getOperatorKey(E op) { |
| return mOps.get(op); |
| } |
| |
| /** |
| * Given an OperatorKey, find the associated operator. |
| * @param opKey OperatorKey |
| * @return associated operator. |
| */ |
| public E getOperator(OperatorKey opKey) { |
| return mKeys.get(opKey); |
| } |
| |
| /** |
| * Get the map of operator key and associated operators |
| * @return map of operator key and operators. |
| */ |
| public Map<OperatorKey, E> getKeys() { |
| return mKeys; |
| } |
| |
| /** |
| * Insert an operator into the plan. This only inserts it as a node in |
| * the graph, it does not connect it to any other operators. That should |
| * be done as a separate step using connect. |
| * @param op Operator to add to the plan. |
| */ |
| public void add(E op) { |
| markDirty(); |
| mOps.put(op, op.getOperatorKey()); |
| mKeys.put(op.getOperatorKey(), op); |
| } |
| |
| /** |
| * Create an edge between two nodes. The direction of the edge implies data |
| * flow. |
| * @param from Operator data will flow from. |
| * @param to Operator data will flow to. |
| * @throws PlanException if this edge will create multiple inputs for an |
| * operator that does not support multiple inputs or create multiple outputs |
| * for an operator that does not support multiple outputs. |
| */ |
| public void connect(E from, E to) throws PlanException { |
| markDirty(); |
| |
| // Check that both nodes are in the plan. |
| checkInPlan(from); |
| checkInPlan(to); |
| |
| // Check to see if the from operator already has outputs, and if so |
| // whether it supports multiple outputs. |
| if (mFromEdges.get(from) != null && |
| !from.supportsMultipleOutputs()) { |
| PlanException pe = new PlanException("Attempt to give operator of type " + |
| from.getClass().getName() + " multiple outputs. This operator does " |
| + "not support multiple outputs."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| |
| // Check to see if the to operator already has inputs, and if so |
| // whether it supports multiple inputs. |
| if (mToEdges.get(to) != null && |
| !to.supportsMultipleInputs()) { |
| PlanException pe = new PlanException("Attempt to give operator of type " + |
| to.getClass().getName() + " multiple inputs. This operator does " |
| + "not support multiple inputs."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| mFromEdges.put(from, to); |
| mToEdges.put(to, from); |
| } |
| |
| /** |
| * Create an soft edge between two nodes. |
| * @param from Operator dependent upon. |
| * @param to Operator having the dependency. |
| * @throws PlanException if the nodes is not in plan |
| */ |
| public void createSoftLink(E from, E to) throws PlanException { |
| // Check that both nodes are in the plan. |
| checkInPlan(from); |
| checkInPlan(to); |
| |
| mSoftFromEdges.put(from, to); |
| mSoftToEdges.put(to, from); |
| } |
| |
| /** |
| * Remove an soft edge |
| * @param from Operator dependent upon |
| * @param to Operator having the dependency |
| */ |
| public void removeSoftLink(E from, E to) { |
| mSoftFromEdges.remove(from, to); |
| mSoftToEdges.remove(to, from); |
| } |
| |
| /** |
| * Remove an edge from between two nodes. |
| * Use {@link org.apache.pig.impl.plan.OperatorPlan#insertBetween(Operator, Operator, Operator)} |
| * if disconnect is used in the process of inserting a new node between two nodes |
| * by calling disconnect followed by a connect. |
| * @param from Operator data would flow from. |
| * @param to Operator data would flow to. |
| * @return true if the nodes were connected according to the specified data |
| * flow, false otherwise. |
| */ |
| public boolean disconnect(E from, E to) { |
| markDirty(); |
| |
| boolean sawNull = false; |
| if (mFromEdges.remove(from, to) == null) sawNull = true; |
| if (mToEdges.remove(to, from) == null) sawNull = true; |
| |
| return !sawNull; |
| } |
| |
| /** |
| * Remove an operator from the plan. Any edges that the node has will |
| * be removed as well. |
| * @param op Operator to remove. |
| */ |
| public void remove(E op) { |
| markDirty(); |
| |
| removeEdges(op, mFromEdges, mToEdges); |
| removeEdges(op, mToEdges, mFromEdges); |
| |
| removeEdges(op, mSoftFromEdges, mSoftToEdges); |
| removeEdges(op, mSoftToEdges, mSoftFromEdges); |
| |
| // Remove the operator from nodes |
| mOps.remove(op); |
| mKeys.remove(op.getOperatorKey()); |
| } |
| |
| /** |
| * Trim everything below a given operator. The specified operator will |
| * NOT be removed. |
| * @param op Operator to trim everything after. |
| */ |
| public void trimBelow(E op) { |
| trimBelow(getSuccessors(op)); |
| } |
| |
| private void trimBelow(List<E> ops) { |
| if (ops != null) { |
| // Make a copy because we'll be messing with the underlying list. |
| List<E> copy = new ArrayList<E>(ops); |
| for (E op : copy) { |
| trimBelow(getSuccessors(op)); |
| remove(op); |
| } |
| } |
| } |
| |
| /** |
| * Move everything below a given operator to the new operator plan. The specified operator will |
| * be moved and will be the root of the new operator plan |
| * @param root Operator to move everything after |
| * @param newPlan new operator plan to move things into |
| * @throws PlanException |
| */ |
| public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException { |
| newPlan.add(root); |
| if (getSuccessors(root) == null) { |
| remove(root); |
| return; |
| } |
| |
| List<E> succs = new ArrayList<E>(); |
| succs.addAll(getSuccessors(root)); |
| |
| for (E succ : succs) { |
| moveTree(succ, newPlan); |
| } |
| remove(root); |
| for (E succ : succs) { |
| newPlan.connect(root, succ); |
| } |
| } |
| |
| /** |
| * Trim everything above a given operator. The specified operator will |
| * NOT be removed. |
| * @param op Operator to trim everything before. |
| */ |
| public void trimAbove(E op) { |
| List<E> predecessors = new ArrayList<E>(getPredecessors(op)); |
| IndexHelper<E> indexHelper = new IndexHelper<E>(predecessors); |
| trimAbove(predecessors); |
| for(E predecessor: predecessors) { |
| try { |
| op.rewire(predecessor, indexHelper.getIndex(predecessor), null, true); |
| } catch (PlanException pe) { |
| //TODO |
| //need to change the method signature to include the exception clause |
| //for now, throwing RunTimeException to workaround this issue |
| throw new RuntimeException("Encountered problems with rewiring operators.", pe); |
| } |
| } |
| } |
| |
| private void trimAbove(List<E> ops) { |
| if (ops != null) { |
| // Make a copy because we'll be messing with the underlying list. |
| List<E> copy = new ArrayList<E>(ops); |
| for (E op : copy) { |
| trimAbove(getPredecessors(op)); |
| remove(op); |
| } |
| } |
| } |
| |
| |
| /** |
| * Find all of the nodes that have edges to the indicated node from |
| * themselves. |
| * @param op Node to look to |
| * @return Collection of nodes. |
| */ |
| public List<E> getPredecessors(E op) { |
| return mToEdges.get(op); |
| } |
| |
| |
| /** |
| * Find all of the nodes that have edges from the indicated node to |
| * themselves. |
| * @param op Node to look from |
| * @return Collection of nodes. |
| */ |
| public List<E> getSuccessors(E op) { |
| return mFromEdges.get(op); |
| } |
| |
| /** |
| * Find all of the nodes that have soft edges to the indicated node from |
| * themselves. |
| * @param op Node to look to |
| * @return Collection of nodes. |
| */ |
| public List<E> getSoftLinkPredecessors(E op) { |
| return mSoftToEdges.get(op); |
| } |
| |
| |
| /** |
| * Find all of the nodes that have soft edges from the indicated node to |
| * themselves. |
| * @param op Node to look from |
| * @return Collection of nodes. |
| */ |
| public List<E> getSoftLinkSuccessors(E op) { |
| return mSoftFromEdges.get(op); |
| } |
| |
| /** |
| * A method to check if there is a path from a given node to another node |
| * @param from the start node for checking |
| * @param to the end node for checking |
| * @return true if path exists, false otherwise |
| */ |
| public boolean pathExists(E from, E to) { |
| List<E> successors = getSuccessors(from); |
| if(successors == null || successors.size() == 0) { |
| return false; |
| } |
| for (E successor : successors) { |
| if(successor.equals(to) |
| || pathExists(successor, to)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public Iterator<E> iterator() { |
| return mOps.keySet().iterator(); |
| } |
| |
| private void markDirty() { |
| mRoots.clear(); |
| mLeaves.clear(); |
| } |
| |
| private void removeEdges(E op, |
| MultiMap<E, E> fromMap, |
| MultiMap<E, E> toMap) { |
| // Find all of the from edges, as I have to remove all the associated to |
| // edges. Need to make a copy so we can delete from the map without |
| // screwing up our iterator. |
| Collection<E> c = fromMap.get(op); |
| if (c == null) return; |
| |
| ArrayList<E> al = new ArrayList<E>(c); |
| Iterator<E> i = al.iterator(); |
| while (i.hasNext()) { |
| E to = i.next(); |
| toMap.remove(to, op); |
| fromMap.remove(op, to); |
| } |
| } |
| |
| private void checkInPlan(E op) throws PlanException { |
| if (mOps.get(op) == null) { |
| PlanException pe = new PlanException("Attempt to connect operator " + |
| op.name() + " which is not in the plan."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| } |
| |
| /** |
| * Merges the operators in the incoming operPlan with |
| * this plan's operators. By merging I mean just making |
| * a combined graph with each one as a component |
| * It doesn't support merging of shared plans |
| * @param inpPlan |
| * @return this pointer |
| * @throws PlanException |
| */ |
| public OperatorPlan<E> merge(OperatorPlan<E> inpPlan) throws PlanException { |
| return doMerge(inpPlan, false); |
| } |
| |
| /** |
| * Merges the operators in the incoming plan with this plan's operators. |
| * The plans can have shared components. |
| * |
| * @param inpPlan |
| * @return this pointer |
| * @throws PlanException |
| */ |
| public OperatorPlan<E> mergeSharedPlan(OperatorPlan<E> inpPlan) throws PlanException { |
| return doMerge(inpPlan, true); |
| } |
| |
| private OperatorPlan<E> doMerge(OperatorPlan<E> inpPlan, boolean allowSharedPlan) throws PlanException { |
| Map<E, OperatorKey> inpOps = inpPlan.mOps; |
| Set<E> curOpsKeySet = mOps.keySet(); |
| for (Map.Entry<E, OperatorKey> mapEnt : inpOps.entrySet()) { |
| if (curOpsKeySet.contains(mapEnt.getKey())) { |
| if (!allowSharedPlan) { |
| PlanException pe = new PlanException( |
| "There are operators that are shared across the plans. Merge of " |
| + "mutually exclusive plans is the only supported merge."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| } else { |
| mOps.put(mapEnt.getKey(), mapEnt.getValue()); |
| } |
| } |
| |
| Map<OperatorKey, E> inpKeys = inpPlan.mKeys; |
| Set<OperatorKey> curOKKeySet = mKeys.keySet(); |
| for (Map.Entry<OperatorKey, E> mapEnt : inpKeys.entrySet()) { |
| if (curOKKeySet.contains(mapEnt.getKey())) { |
| if (!allowSharedPlan) { |
| PlanException pe = new PlanException( |
| "There are operators that are shared across the plans. Merge of " |
| + "mutually exclusive plans is the only supported merge."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| } else { |
| mKeys.put(mapEnt.getKey(), mapEnt.getValue()); |
| } |
| } |
| |
| MultiMap<E, E> inpFromEdges = inpPlan.mFromEdges; |
| Set<E> curFEKeySet = mFromEdges.keySet(); |
| for (E fromEdg : inpFromEdges.keySet()) { |
| if (curFEKeySet.contains(fromEdg) && !allowSharedPlan) { |
| PlanException pe = new PlanException( |
| "There are operators that are shared across the plans. Merge of " |
| + "mutually exclusive plans is the only supported merge."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| |
| for (E e : inpFromEdges.get(fromEdg)) { |
| if (mFromEdges.get(fromEdg) == null || !mFromEdges.get(fromEdg).contains(e)) { |
| mFromEdges.put(fromEdg, e); |
| } |
| } |
| } |
| |
| MultiMap<E, E> inpToEdges = inpPlan.mToEdges; |
| Set<E> curTEKeySet = mToEdges.keySet(); |
| for (E toEdg : inpToEdges.keySet()) { |
| if (curTEKeySet.contains(toEdg) && !allowSharedPlan) { |
| PlanException pe = new PlanException( |
| "There are operators that are shared across the plans. Merge of " |
| + "mutually exclusive plans is the only supported merge."); |
| log.error(pe.getMessage()); |
| throw pe; |
| } |
| |
| for (E e : inpToEdges.get(toEdg)) { |
| if (mToEdges.get(toEdg) == null || !mToEdges.get(toEdg).contains(e)) { |
| mToEdges.put(toEdg, e); |
| } |
| } |
| } |
| |
| markDirty(); |
| return this; |
| } |
| |
| |
| /** |
| * Utility method heavily used in the MRCompiler |
| * Adds the leaf operator to the plan and connects |
| * all existing leaves to the new leaf |
| * @param leaf |
| * @throws PlanException |
| */ |
| public void addAsLeaf(E leaf) throws PlanException { |
| List<E> ret = new ArrayList<E>(); |
| for (E operator : getLeaves()) { |
| ret.add(operator); |
| } |
| add(leaf); |
| for (E oper : ret) { |
| connect(oper, leaf); |
| } |
| } |
| |
| public boolean isSingleLeafPlan() { |
| List<E> tmpList = getLeaves() ; |
| return tmpList.size() == 1 ; |
| } |
| |
| public int size() { |
| return mKeys.size() ; |
| } |
| |
| /** |
| * Given two connected nodes add another node between them. |
| * 'newNode' will be placed in same position in predecessor list as 'before' (old node). |
| * @param after Node to insert this node after |
| * @param newNode new node to insert. This node must have already been |
| * added to the plan. |
| * @param before Node to insert this node before |
| * @throws PlanException if it encounters trouble disconnecting or |
| * connecting nodes. |
| */ |
| public void insertBetween( |
| E after, |
| E newNode, |
| E before) throws PlanException { |
| doInsertBetween(after, newNode, before, true); |
| } |
| |
| /* |
| * Private method to perform the insertBetween operation with the ability to turn off |
| * rewiring operation. |
| */ |
| public void doInsertBetween( |
| E after, |
| E newNode, |
| E before, |
| boolean rewire) throws PlanException { |
| checkInPlan(newNode); |
| List<E> newNodePreds = getPredecessors(newNode); |
| //assuming that the newNode has zero or one predecessor |
| E newNodePred = (newNodePreds == null? null: newNodePreds.get(0)); |
| if (!replaceNode(after, newNode, before, mFromEdges) || !replaceNode(before, newNode, after, mToEdges)) { |
| int errCode = 1094; |
| String msg = "Attempt to insert between two nodes " + |
| "that were not connected."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| mFromEdges.put(newNode, before); |
| mToEdges.put(newNode, after); |
| |
| if(rewire) { |
| if((newNodePred != null) && !(newNodePred.equals(after))) { |
| newNodePred.regenerateProjectionMap(); |
| newNode.rewire(newNodePred, 0, after, true); |
| } |
| newNode.regenerateProjectionMap(); |
| IndexHelper<E> indexHelper = new IndexHelper<E>(getPredecessors(newNode)); |
| before.rewire(after, indexHelper.getIndex(after), newNode, false); |
| } |
| } |
| |
| // replaces (src -> dst) entry in multiMap with (src -> replacement) |
| private boolean replaceNode(E src, E replacement, E dst, MultiMap<E, E> multiMap) { |
| if(multiMap == null) return false; |
| |
| if(src == null) return false; |
| |
| List<E> nodes = (ArrayList<E>)multiMap.get(src); |
| if (nodes == null) { |
| //we need to add replacement to the multimap as long as replacement != null |
| if(replacement == null) { |
| return false; |
| } else if (dst == null) { |
| ArrayList<E> replacementNodes = new ArrayList<E>(); |
| replacementNodes.add(replacement); |
| multiMap.put(src, replacementNodes); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| if(dst == null) return false; |
| |
| boolean replaced = false; |
| ArrayList<E> replacementNodes = new ArrayList<E>(); |
| for(int i = 0; i < nodes.size(); ++i) { |
| E to = nodes.get(i); |
| if(to.equals(dst)) { |
| replaced = true; |
| if(replacement != null) { |
| replacementNodes.add(replacement); |
| } |
| } else { |
| replacementNodes.add(to); |
| } |
| } |
| |
| if(replaced) { |
| multiMap.removeKey(src); |
| if(replacementNodes.size() > 0) { |
| multiMap.put(src, replacementNodes); |
| } |
| } |
| return replaced; |
| } |
| |
| /** |
| * Replace an existing node in the graph with a new node. The new node |
| * will be connected to all the nodes the old node was. The old node will |
| * be removed. The new node is assumed to have no incoming or outgoing edges |
| * @param oldNode Node to be replaced |
| * @param newNode Node to add in place of oldNode |
| * @throws PlanException |
| */ |
| public void replace(E oldNode, E newNode) throws PlanException { |
| checkInPlan(oldNode); |
| add(newNode); |
| List<E> oldNodeSuccs = (getSuccessors(oldNode) == null? null : new ArrayList<E>(getSuccessors(oldNode))); |
| List<IndexHelper<E>> indexHelpers = new ArrayList<IndexHelper<E>>(); |
| if(oldNodeSuccs != null) { |
| for(int i = 0; i < oldNodeSuccs.size(); ++i) { |
| E oldNodeSucc = oldNodeSuccs.get(i); |
| indexHelpers.add(new IndexHelper<E>(new ArrayList<E>(getPredecessors(oldNodeSucc)))); |
| } |
| } |
| |
| |
| mToEdges = generateNewMap(oldNode, newNode, mToEdges); |
| mFromEdges = generateNewMap(oldNode, newNode, mFromEdges); |
| |
| //ensure that the oldNode's successors are rewired |
| if(oldNodeSuccs != null) { |
| for(int i = 0; i < oldNodeSuccs.size(); ++i) { |
| E oldNodeSucc = oldNodeSuccs.get(i); |
| oldNodeSucc.rewire(oldNode, indexHelpers.get(i).getIndex(oldNode), newNode, true); |
| } |
| } |
| remove(oldNode); |
| |
| } |
| |
| private MultiMap<E, E> generateNewMap( |
| E oldNode, |
| E newNode, |
| MultiMap<E, E> mm) { |
| // First, replace the key |
| Collection<E> targets = mm.get(oldNode); |
| if (targets != null) { |
| mm.removeKey(oldNode); |
| mm.put(newNode, targets); |
| } |
| |
| // We can't just do a remove and add in the map because of our |
| // guarantee of not changing orders. So we need to walk the lists and |
| // put the new node in the same slot as the old. |
| |
| // Walk all the other keys and replace any references to the oldNode |
| // in their targets. |
| MultiMap<E, E> newMap = new MultiMap<E, E>(mm.size()); |
| for (E key : mm.keySet()) { |
| Collection<E> c = mm.get(key); |
| ArrayList<E> al = new ArrayList<E>(c); |
| for (int i = 0; i < al.size(); i++) { |
| if (al.get(i) == oldNode) al.set(i, newNode); |
| } |
| newMap.put(key, al); |
| } |
| return newMap; |
| } |
| |
| /** |
| * Remove a node in a way that connects the node's predecessor (if any) |
| * with the node's successor (if any). This function does not handle the |
| * case where the node has multiple predecessors or successors. |
| * @param node Node to be removed |
| * @throws PlanException if the node has more than one predecessor or |
| * successor. |
| */ |
| public void removeAndReconnect(E node) throws PlanException { |
| List<E> preds = getPredecessors(node); |
| E pred = null; |
| if (preds != null) { |
| if (preds.size() > 1) { |
| int errCode = 1095; |
| String msg = "Attempt to remove " + |
| " and reconnect for node with multiple predecessors."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| pred = preds.get(0); |
| disconnect(pred, node); |
| } |
| |
| int oldPos = -1; |
| int newPos = -1; |
| |
| List<E> succs = getSuccessors(node); |
| E succ = null; |
| if (succs != null) { |
| if (succs.size() > 1) { |
| int errCode = 1095; |
| String msg = "Attempt to remove " + |
| " and reconnect for node with multiple successors."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| succ = succs.get(0); |
| List<E> plst = getPredecessors(succ); |
| for (int i=0; i<plst.size(); i++) { |
| if (plst.get(i).equals(node)) { |
| oldPos = i; |
| } |
| } |
| disconnect(node, succ); |
| } |
| |
| remove(node); |
| if (pred != null && succ != null) { |
| connect(pred, succ); |
| List<E> plst = getPredecessors(succ); |
| for (int i=0; i<plst.size(); i++) { |
| if (plst.get(i).equals(pred)) { |
| newPos = i; |
| } |
| } |
| |
| if (oldPos < 0 || newPos < 0) { |
| throw new PlanException("Invalid position index: " + oldPos |
| + " : " + newPos); |
| } |
| |
| if (oldPos != newPos) { |
| List<E> nlst = new ArrayList<E>(); |
| for (int i=0; i<plst.size(); i++) { |
| E nod = plst.get(i); |
| if (i == oldPos) { |
| nlst.add(pred); |
| } |
| if (i == newPos) continue; |
| nlst.add(nod); |
| } |
| |
| if (nlst.size() != plst.size()) { |
| throw new PlanException("Invalid list size: " + nlst.size() |
| + " : " + plst.size()); |
| } |
| |
| mToEdges.removeKey(succ); |
| mToEdges.put(succ, nlst); |
| } |
| |
| succ.rewire(node, oldPos, pred, true); |
| } |
| } |
| |
| private void reconnectSuccessors(E node, boolean successorRequired, boolean removeNode) throws PlanException { |
| // Before: |
| // A (predecessor (only one) ) |
| // / | |
| // X B(nodeB) Y(some predecessor of a Cn) |
| // / | \ / |
| // C1 C2 C3 ... (Successors) |
| // should become |
| // After: |
| // ___ A Y |
| // / / | \ / |
| // X C1 C2 C3 ... |
| // the variable names are from above example |
| |
| E nodeB = node; |
| List<E> preds = getPredecessors(nodeB); |
| //checking pre-requisite conditions |
| if (preds == null || preds.size() != 1) { |
| Integer size = null; |
| if(preds != null) |
| size = preds.size(); |
| int errCode = 1096; |
| String msg = "Attempt to remove " |
| + " and reconnect for node with " + size |
| + " predecessors."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| |
| //A and C |
| E nodeA = preds.get(0); |
| Collection<E> nodeC = (mFromEdges.get(nodeB) == null? null : new ArrayList<E>(mFromEdges.get(nodeB))); |
| |
| //checking pre-requisite conditions |
| if(successorRequired) { |
| if (nodeC == null || nodeC.size() == 0) { |
| int errCode = 1096; |
| String msg = "Attempt to remove " + |
| " and reconnect for node with no successors."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| } |
| |
| List<IndexHelper<E>> indexHelpers = new ArrayList<IndexHelper<E>>(); |
| if(nodeC != null) { |
| for(int i = 0; i < nodeC.size(); ++i) { |
| E c = ((List<E>)nodeC).get(i); |
| indexHelpers.add(new IndexHelper<E>(new ArrayList<E>(getPredecessors(c)))); |
| } |
| } |
| |
| // replace B in A.succesors and add B.successors(ie C) to it |
| replaceAndAddSucessors(nodeA, nodeB); |
| |
| // for all C(succs) , replace B(node) in predecessors, with A(pred) |
| if(nodeC != null) { |
| for(int i = 0; i < nodeC.size(); ++i) { |
| E c = ((List<E>)nodeC).get(i); |
| Collection<E> sPreds = mToEdges.get(c); |
| ArrayList<E> newPreds = new ArrayList<E>(sPreds.size()); |
| for(E p: sPreds){ |
| if(p == nodeB){ |
| //replace |
| newPreds.add(nodeA); |
| } |
| else{ |
| newPreds.add(p); |
| } |
| } |
| mToEdges.removeKey(c); |
| mToEdges.put(c,newPreds); |
| |
| } |
| } |
| |
| if(removeNode) { |
| remove(nodeB); |
| } else { |
| //make sure that the node does not have any dangling from and to edges |
| mFromEdges.removeKey(nodeB); |
| mToEdges.removeKey(nodeB); |
| } |
| |
| //ensure that any existing successor of nodeB is rewired to have nodeA in place of nodeB |
| if(nodeC != null) { |
| for(int i = 0; i < nodeC.size(); ++i) { |
| E c = ((List<E>)nodeC).get(i); |
| c.rewire(nodeB, indexHelpers.get(i).getIndex(nodeB), nodeA, true); |
| } |
| } |
| } |
| |
| private void reconnectPredecessors(E node, boolean predecessorRequired, boolean removeNode) throws PlanException { |
| // Before: |
| // C1 C2 C3 ... (Predecessors) |
| // \ | / \ |
| // X B(nodeB) Y(some successor of a Cn) |
| // \ | |
| // A (successor (only one) ) |
| |
| |
| // should become |
| // After: |
| // X C1 C2 C3 ... |
| // \ \ | / \ |
| // A Y |
| // the variable names are from above example |
| |
| E nodeB = node; |
| List<E> nodeBsuccessors = getSuccessors(nodeB); |
| //checking pre-requisite conditions |
| if (nodeBsuccessors == null || nodeBsuccessors.size() != 1) { |
| Integer size = null; |
| if(nodeBsuccessors != null) |
| size = nodeBsuccessors.size(); |
| |
| int errCode = 1096; |
| String msg = "Attempt to remove " |
| + " and reconnect for node with " + size + " successors."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| |
| //A and C |
| E nodeA = nodeBsuccessors.get(0); |
| Collection<E> nodeC = (mToEdges.get(nodeB) == null? null : new ArrayList<E>(mToEdges.get(nodeB))); |
| |
| //checking pre-requisite conditions |
| if(predecessorRequired) { |
| if (nodeC == null || nodeC.size() == 0) { |
| int errCode = 1096; |
| String msg = "Attempt to remove " |
| + " and reconnect for node with no predecessors."; |
| PlanException pe = new PlanException(msg, errCode, PigException.INPUT); |
| throw pe; |
| } |
| } |
| |
| |
| // replace B in A.predecessors and add B.predecessors(ie C) to it |
| replaceAndAddPredecessors(nodeA, nodeB); |
| |
| // for all C(predecessors) , replace B(node) in successors, with A(successor) |
| if(nodeC != null) { |
| |
| for(E c: nodeC) { |
| Collection<E> sPreds = mFromEdges.get(c); |
| ArrayList<E> newPreds = new ArrayList<E>(sPreds.size()); |
| for(E p: sPreds){ |
| if(p == nodeB){ |
| //replace |
| newPreds.add(nodeA); |
| } |
| else{ |
| newPreds.add(p); |
| } |
| } |
| mFromEdges.removeKey(c); |
| mFromEdges.put(c,newPreds); |
| |
| //rewire nodeA |
| nodeA.rewire(nodeB, 0, c, true); |
| } |
| } |
| |
| if(removeNode) { |
| remove(nodeB); |
| } else { |
| //make sure that the node does not have any dangling from and to edges |
| mFromEdges.removeKey(nodeB); |
| mToEdges.removeKey(nodeB); |
| } |
| } |
| |
| // removes entry for successor in list of successors of node |
| // and adds successors of successor in its place |
| // @param node - parent node whose entry for successor needs to be replaced |
| // @param successor - see above |
| private void replaceAndAddSucessors(E node, E successor) throws PlanException { |
| Collection<E> oldSuccessors = mFromEdges.get(node); |
| Collection<E> replacementSuccessors = mFromEdges.get(successor); |
| ArrayList<E> newSuccessors = new ArrayList<E>(); |
| for(E s: oldSuccessors){ |
| if(s == successor){ |
| if(replacementSuccessors != null) { |
| newSuccessors.addAll(replacementSuccessors); |
| } |
| }else{ |
| newSuccessors.add(s); |
| } |
| } |
| mFromEdges.removeKey(node); |
| if (!newSuccessors.isEmpty()) { |
| mFromEdges.put(node,newSuccessors); |
| } |
| } |
| |
| // removes entry for predecessor in list of predecessors of node, |
| // and adds predecessors of predecessor in its place |
| // @param node - parent node whose entry for predecessor needs to be replaced |
| // @param predecessor - see above |
| private void replaceAndAddPredecessors(E node, E predecessor) throws PlanException { |
| Collection<E> oldPredecessors = mToEdges.get(node); |
| Collection<E> replacementPredecessors = mToEdges.get(predecessor); |
| ArrayList<E> newPredecessors = new ArrayList<E>(); |
| for(E p: oldPredecessors){ |
| if(p == predecessor){ |
| if(replacementPredecessors != null) { |
| newPredecessors.addAll(replacementPredecessors); |
| } |
| }else{ |
| newPredecessors.add(p); |
| } |
| } |
| mToEdges.removeKey(node); |
| if (!newPredecessors.isEmpty()) { |
| mToEdges.put(node,newPredecessors); |
| } |
| } |
| |
| /** |
| * Remove a node in a way that connects the node's predecessor (if any) |
| * with the node's successors (if any). This function handles the |
| * case where the node has *one* predecessor and one or more successors. |
| * It replaces the predecessor in same position as node was in |
| * each of the successors predecessor list(getPredecessors()), to |
| * preserve input ordering |
| * for eg, it is used to remove redundant project(*) from plan |
| * which will have only one predecessor,but can have multiple success |
| * @param node Node to be removed |
| * @throws PlanException if the node has more than one predecessor |
| */ |
| public void removeAndReconnectMultiSucc(E node) throws PlanException { |
| reconnectSuccessors(node, true, true); |
| } |
| |
| |
| |
| public void dump(PrintStream ps) { |
| ps.println("Ops"); |
| for (E op : mOps.keySet()) { |
| ps.println(op.name()); |
| } |
| ps.println("from edges"); |
| for (E op : mFromEdges.keySet()) { |
| for (E to : mFromEdges.get(op)) { |
| ps.println(op.name() + " -> " + to.name()); |
| } |
| } |
| ps.println("to edges"); |
| for (E op : mToEdges.keySet()) { |
| for (E to : mToEdges.get(op)) { |
| ps.println(op.name() + " -> " + to.name()); |
| } |
| } |
| } |
| |
| /** |
| * Swap two operators in a plan. Both of the operators must have single |
| * inputs and single outputs. |
| * @param first operator |
| * @param second operator |
| * @throws PlanException if either operator is not single input and output. |
| */ |
| public void swap(E first, E second) throws PlanException { |
| E firstNode = first; |
| E secondNode = second; |
| |
| if(firstNode == null) { |
| int errCode = 1092; |
| String msg = "First operator in swap is null. Cannot swap null operators."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| if(secondNode == null) { |
| int errCode = 1092; |
| String msg = "Second operator in swap is null. Cannot swap null operators."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| checkInPlan(firstNode); |
| checkInPlan(secondNode); |
| |
| List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode); |
| |
| if(firstNodePredecessors != null && firstNodePredecessors.size() > 1) { |
| int errCode = 1093; |
| String msg = "Swap supports swap of operators with at most one input." |
| + " Found first operator with " + firstNodePredecessors.size() + " inputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode); |
| |
| if(firstNodeSuccessors != null && firstNodeSuccessors.size() > 1) { |
| int errCode = 1093; |
| String msg = "Swap supports swap of operators with at most one output." |
| + " Found first operator with " + firstNodeSuccessors.size() + " outputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode); |
| |
| if(secondNodePredecessors != null && secondNodePredecessors.size() > 1) { |
| int errCode = 1093; |
| String msg = "Swap supports swap of operators with at most one input." |
| + " Found second operator with " + secondNodePredecessors.size() + " inputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode); |
| |
| if(secondNodeSuccessors != null && secondNodeSuccessors.size() > 1) { |
| int errCode = 1093; |
| String msg = "Swap supports swap of operators with at most one output." |
| + " Found second operator with " + secondNodeSuccessors.size() + " outputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| E firstNodePredecessor = null; |
| E firstNodeSuccessor = null; |
| E secondNodePredecessor = null; |
| E secondNodeSuccessor = null; |
| |
| if(firstNodePredecessors != null) { |
| firstNodePredecessor = firstNodePredecessors.get(0); |
| } |
| |
| if(firstNodeSuccessors != null) { |
| firstNodeSuccessor = firstNodeSuccessors.get(0); |
| } |
| |
| if(secondNodePredecessors != null) { |
| secondNodePredecessor = secondNodePredecessors.get(0); |
| } |
| |
| if(secondNodeSuccessors != null) { |
| secondNodeSuccessor = secondNodeSuccessors.get(0); |
| } |
| |
| boolean immediateNodes = false; |
| |
| if((firstNodeSuccessor == secondNode) && (secondNodePredecessor == firstNode)) { |
| immediateNodes = true; |
| } else if ((secondNodeSuccessor == firstNode) && (firstNodePredecessor == secondNode)) { |
| immediateNodes = true; |
| //swap the firstNode and secondNode |
| E tmpNode = firstNode; |
| firstNode = secondNode; |
| secondNode = tmpNode; |
| |
| //swap the predecessor and successor nodes |
| tmpNode = firstNodePredecessor; |
| firstNodePredecessor = secondNodePredecessor; |
| secondNodePredecessor = tmpNode; |
| |
| tmpNode = firstNodeSuccessor; |
| firstNodeSuccessor = secondNodeSuccessor; |
| secondNodeSuccessor = tmpNode; |
| } |
| |
| if(immediateNodes) { |
| //Replace the predecessors and successors of first and second in their respective edge lists |
| replaceNode(firstNode, secondNodeSuccessor, firstNodeSuccessor, mFromEdges); |
| replaceNode(firstNode, secondNode, firstNodePredecessor, mToEdges); |
| replaceNode(secondNode, firstNode, secondNodeSuccessor, mFromEdges); |
| replaceNode(secondNode, firstNodePredecessor, secondNodePredecessor, mToEdges); |
| |
| //rewire the two nodes |
| secondNode.rewire(firstNode, 0, firstNodePredecessor, true); |
| secondNode.regenerateProjectionMap(); |
| firstNode.rewire(firstNodePredecessor, 0, secondNode, false); |
| |
| } else { |
| //Replace the predecessors and successors of first and second in their respective edge lists |
| replaceNode(firstNode, secondNodeSuccessor, firstNodeSuccessor, mFromEdges); |
| replaceNode(firstNode, secondNodePredecessor, firstNodePredecessor, mToEdges); |
| replaceNode(secondNode, firstNodeSuccessor, secondNodeSuccessor, mFromEdges); |
| replaceNode(secondNode, firstNodePredecessor, secondNodePredecessor, mToEdges); |
| |
| //rewire the two nodes |
| //here the use of true as the final parameter is questionable |
| //there is no knowledge about how to use the projection maps |
| firstNode.rewire(firstNodePredecessor, 0, secondNodePredecessor, true); |
| secondNode.rewire(secondNodePredecessor, 0, firstNodePredecessor, true); |
| } |
| |
| //Replace first with second in the edges list for first's predecessor and successor |
| replaceNode(firstNodePredecessor, secondNode, firstNode, mFromEdges); |
| replaceNode(firstNodeSuccessor, secondNode, firstNode, mToEdges); |
| |
| //Replace second with first in the edges list for second's predecessor and successor |
| replaceNode(secondNodePredecessor, firstNode, secondNode, mFromEdges); |
| replaceNode(secondNodeSuccessor, firstNode, secondNode, mToEdges); |
| |
| if(firstNodeSuccessor != null) { |
| //here the use of true as the final parameter is questionable |
| //there is no knowledge about how to use the projection maps |
| firstNodeSuccessor.rewire(firstNode, 0, secondNode, true); |
| } |
| |
| if(secondNodeSuccessor != null) { |
| //here the use of true as the final parameter is questionable |
| //there is no knowledge about how to use the projection maps |
| secondNodeSuccessor.rewire(secondNode, 0, firstNode, true); |
| } |
| |
| markDirty(); |
| } |
| |
| /** |
| * Push one operator in front of another. This function is for use when |
| * the first operator has multiple inputs. The caller can specify |
| * which input of the first operator the second operator should be pushed to. |
| * @param first operator, assumed to have multiple inputs. |
| * @param second operator, will be pushed in front of first |
| * @param inputNum indicates which input of the first operator the second |
| * operator will be pushed onto. Numbered from 0. |
| * @throws PlanException if inputNum does not exist for first operator |
| */ |
| public void pushBefore(E first, E second, int inputNum) throws PlanException { |
| E firstNode = first; |
| E secondNode = second; |
| |
| if(firstNode == null) { |
| int errCode = 1085; |
| String msg = "First operator in pushBefore is null. Cannot pushBefore null operators."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| if(secondNode == null) { |
| int errCode = 1085; |
| String msg = "Second operator in pushBefore is null. Cannot pushBefore null operators."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| checkInPlan(firstNode); |
| checkInPlan(secondNode); |
| |
| List<E> firstNodePredecessors = (mToEdges.get(firstNode) == null? null : new ArrayList<E>(mToEdges.get(firstNode))); |
| |
| if(firstNodePredecessors == null || firstNodePredecessors.size() <= 1) { |
| int size = (firstNodePredecessors == null ? 0 : firstNodePredecessors.size()); |
| int errCode = 1086; |
| String msg = "First operator in pushBefore should have multiple inputs." |
| + " Found first operator with " + size + " inputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| if(inputNum >= firstNodePredecessors.size()) { |
| int errCode = 1087; |
| String msg = "The inputNum " + inputNum + " should be lesser than the number of inputs of the first operator." |
| + " Found first operator with " + firstNodePredecessors.size() + " inputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> firstNodeSuccessors = (mFromEdges.get(firstNode) == null? null : new ArrayList<E>(mFromEdges.get(firstNode))); |
| |
| if(firstNodeSuccessors == null) { |
| int errCode = 1088; |
| String msg = "First operator in pushBefore should have at least one output." |
| + " Found first operator with no outputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> secondNodePredecessors = (mToEdges.get(secondNode) == null? null : new ArrayList<E>(mToEdges.get(secondNode))); |
| |
| if(secondNodePredecessors == null || secondNodePredecessors.size() > 1) { |
| int size = (secondNodePredecessors == null ? 0 : secondNodePredecessors.size()); |
| int errCode = 1088; |
| String msg = "Second operator in pushBefore should have one input." |
| + " Found second operator with " + size + " inputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> secondNodeSuccessors = (mFromEdges.get(secondNode) == null? null : new ArrayList<E>(mFromEdges.get(secondNode))); |
| |
| //check for multiple edges from first to second |
| int edgesFromFirstToSecond = 0; |
| for(E node: firstNodeSuccessors) { |
| if(node == secondNode) { |
| ++edgesFromFirstToSecond; |
| } |
| } |
| |
| if(edgesFromFirstToSecond == 0) { |
| int errCode = 1089; |
| String msg = "Second operator in pushBefore should be the successor of the First operator."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } else if (edgesFromFirstToSecond > 1) { |
| int errCode = 1090; |
| String msg = "Second operator can have at most one incoming edge from First operator." |
| + " Found " + edgesFromFirstToSecond + " edges."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| //check if E (i.e., firstNode) can support multiple outputs before we short-circuit |
| |
| if(!firstNode.supportsMultipleOutputs()) { |
| int numSecondNodeSuccessors = (secondNodeSuccessors == null? 0 : secondNodeSuccessors.size()); |
| if((firstNodeSuccessors.size() > 0) || (numSecondNodeSuccessors > 0)) { |
| int errCode = 1091; |
| String msg = "First operator does not support multiple outputs." |
| + " On completing the pushBefore operation First operator will end up with " |
| + (firstNodeSuccessors.size() + numSecondNodeSuccessors) + " edges."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| } |
| |
| //Assume that we have a graph which is like |
| // A B C D |
| // \ | | / |
| // E |
| // / | \ |
| // F G H |
| // / | \ |
| // I J K |
| // |
| //Now pushBefore(E, G, 1) |
| //This can be done using the following sequence of transformations |
| //1. Promote G's successors as E's successors using reconnectSuccessors(G) |
| //2. Insert G between B and E using insertBetween(B, G, E) |
| //The graphs after each step |
| //Step 1 - Note that G is standing alone |
| // A B C D G |
| // \ | | / |
| // E |
| // / / | \ \ |
| // F I J K H |
| //Step 2 |
| // B |
| // | |
| // A G C D |
| // \ | | / |
| // E |
| // / / | \ \ |
| // F I J K H |
| |
| reconnectSuccessors(secondNode, false, false); |
| doInsertBetween(firstNodePredecessors.get(inputNum), secondNode, firstNode, false); |
| |
| //A note on the use of rewire |
| //Rewire is used within reconnectPredecessors. However, rewire is explicitly turned off in insertBetween |
| //The rewiring is done explicitly here to avoid exceptions that are generated due to precondition |
| //violations in insertBetween |
| secondNode.rewire(firstNode, inputNum, firstNodePredecessors.get(inputNum), true); |
| secondNode.regenerateProjectionMap(); |
| firstNode.rewire(firstNodePredecessors.get(inputNum), 0, secondNode, false); |
| |
| markDirty(); |
| return; |
| } |
| |
| /** |
| * Push one operator after another. This function is for use when the second |
| * operator has multiple outputs. The caller can specify which output of the |
| * second operator the first operator should be pushed to. |
| * @param first operator, assumed to have multiple outputs |
| * @param second operator, will be pushed after the first operator |
| * @param outputNum indicates which output of the first operator the second |
| * operator will be pushed onto. Numbered from 0. |
| * @throws PlanException if outputNum does not exist for first operator |
| */ |
| public void pushAfter(E first, E second, int outputNum) throws PlanException { |
| E firstNode = first; |
| E secondNode = second; |
| |
| if(firstNode == null) { |
| int errCode = 1085; |
| String msg = "First operator in pushAfter is null. Cannot pushBefore null operators."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| if(secondNode == null) { |
| int errCode = 1085; |
| String msg = "Second operator in pushAfter is null. Cannot pushBefore null operators."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| checkInPlan(firstNode); |
| checkInPlan(secondNode); |
| |
| List<E> firstNodePredecessors = (mToEdges.get(firstNode) == null? null : new ArrayList<E>(mToEdges.get(firstNode))); |
| |
| if(firstNodePredecessors == null) { |
| int errCode = 1088; |
| String msg = "First operator in pushAfter should have at least one input." |
| + " Found first operator with no inputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> firstNodeSuccessors = (mFromEdges.get(firstNode) == null? null: new ArrayList<E>(mFromEdges.get(firstNode))); |
| |
| if(firstNodeSuccessors == null || firstNodeSuccessors.size() <= 1) { |
| int size = (firstNodeSuccessors == null ? 0 : firstNodeSuccessors.size()); |
| int errCode = 1086; |
| String msg = "First operator in pushAfter should have multiple outputs." |
| + " Found first operator with " + size + " outputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| if(outputNum >= firstNodeSuccessors.size()) { |
| int errCode = 1087; |
| String msg = "The outputNum " + outputNum + " should be lesser than the number of outputs of the first operator." |
| + " Found first operator with " + firstNodeSuccessors.size() + " outputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| List<E> secondNodePredecessors = (mToEdges.get(secondNode) == null? null : new ArrayList<E>(mToEdges.get(secondNode))); |
| |
| List<E> secondNodeSuccessors = (mFromEdges.get(secondNode) == null? null : new ArrayList<E>(mFromEdges.get(secondNode))); |
| |
| if(secondNodeSuccessors == null || secondNodeSuccessors.size() > 1) { |
| int size = (secondNodeSuccessors == null ? 0 : secondNodeSuccessors.size()); |
| int errCode = 1088; |
| String msg = "Second operator in pushAfter should have one output." |
| + " Found second operator with " + size + " outputs."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| |
| //check for multiple edges from second to first |
| int edgesFromSecondToFirst = 0; |
| for(E node: secondNodeSuccessors) { |
| if(node == firstNode) { |
| ++edgesFromSecondToFirst; |
| } |
| } |
| |
| if(edgesFromSecondToFirst == 0) { |
| int errCode = 1089; |
| String msg = "Second operator in pushAfter should be the predecessor of the First operator."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } else if (edgesFromSecondToFirst > 1) { |
| int errCode = 1090; |
| String msg = "Second operator can have at most one outgoing edge from First operator." |
| + " Found " + edgesFromSecondToFirst + " edges."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| |
| //check if E (i.e., firstNode) can support multiple outputs before we short-circuit |
| |
| if(!firstNode.supportsMultipleInputs()) { |
| int numSecondNodePredecessors = (secondNodePredecessors == null? 0 : secondNodePredecessors.size()); |
| //if((firstNodePredecessors.size() > 0) || (numSecondNodePredecessors > 0)) { |
| if(numSecondNodePredecessors > 1) { |
| int errCode = 1091; |
| String msg = "First operator does not support multiple inputs." |
| + " On completing the pushAfter operation First operator will end up with " |
| + (firstNodePredecessors.size() + numSecondNodePredecessors) + " edges."; |
| throw new PlanException(msg, errCode, PigException.INPUT); |
| } |
| } |
| |
| //Assume that we have a graph which is like |
| // A B C D |
| // \ | | / |
| // E |
| // | |
| // G |
| // / | \ |
| // I J K |
| // |
| //Now pushAfter(G, E, 1) |
| //This can be done using the following sequence of transformations |
| //1. Promote E's predecessors as G's predecessors using reconnectPredecessors(E) |
| //2. Insert E between G and J using insertBetween(G, E, J) |
| //The graphs after each step |
| //Step 1 - Note that E is standing alone |
| // A B C D E |
| // \ | | / |
| // G |
| // / | \ |
| // I J K |
| //Step 2 |
| // A B C D |
| // \ | | / |
| // G |
| // / | \ |
| // I E K |
| // | |
| // J |
| |
| |
| reconnectPredecessors(secondNode, false, false); |
| doInsertBetween(firstNode, secondNode, firstNodeSuccessors.get(outputNum), false); |
| //A note on the use of rewire |
| //Rewire is used within reconnectPredecessors. However, rewire is explicitly turned off in insertBetween |
| //The rewiring is done explicitly here to avoid exceptions that are generated due to precodition |
| //violations in insertBetween |
| |
| if(secondNodePredecessors != null) { |
| for(int i = 0; i < secondNodePredecessors.size(); ++i) { |
| E secondNodePred = secondNodePredecessors.get(i); |
| secondNode.rewire(secondNodePred, i, firstNode, true); |
| } |
| } |
| |
| secondNode.regenerateProjectionMap(); |
| firstNodeSuccessors.get(outputNum).rewire(firstNode, 0, secondNode, false); |
| |
| markDirty(); |
| return; |
| |
| } |
| |
| /* |
| * A helper class that computes the index of each reference in a list for a quick lookup |
| */ |
| public static class IndexHelper <E> { |
| |
| private Map<E, Integer> mIndex = null; |
| |
| public IndexHelper(List<E> list) { |
| if(list != null) { |
| if(list.size() != 0) { |
| mIndex = new HashMap<E, Integer>(); |
| for(int i = 0; i < list.size(); ++i) { |
| mIndex.put(list.get(i), i); |
| } |
| } |
| } |
| } |
| |
| public int getIndex(E e) { |
| if(mIndex == null || mIndex.size() == 0) return -1; |
| return mIndex.get(e); |
| } |
| } |
| |
| } |