blob: 0e009194a700d2d03512868c0f1131586711692d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.visitor;
import java.util.List;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.optimizer.AllSameRalationalNodesVisitor;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.newplan.logical.optimizer.UidResetter;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
public class ImplicitSplitInsertVisitor extends AllSameRalationalNodesVisitor {
public ImplicitSplitInsertVisitor(LogicalPlan plan) throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
}
// Look to see if this is a non-split node with two outputs. If so
// it matches.
private boolean nodeHasTwoOutputs(LogicalRelationalOperator op) {
if (op instanceof LOSplit || op instanceof LOStore) {
return false;
}
List<Operator> succs = plan.getSuccessors(op);
if (succs != null && succs.size() >= 2) {
return true;
} else {
return false;
}
}
@Override
public void execute(LogicalRelationalOperator op) throws FrontendException {
if(!nodeHasTwoOutputs(op) ) {
return;
}
// For two successors of op here is a pictorial
// representation of the change required:
// BEFORE:
// Succ1 Succ2
// \ /
// op
// SHOULD BECOME:
// AFTER:
// Succ1 Succ2
// | |
// SplitOutput SplitOutput
// \ /
// Split
// |
// op
List<Operator> succs = plan.getSuccessors(op);
if (succs == null || succs.size() < 2) {
throw new FrontendException("Invalid match in ImplicitSplitInserter rule.", 2243);
}
LOSplit splitOp = new LOSplit(plan);
splitOp.setAlias(((LogicalRelationalOperator) op).getAlias());
Operator[] sucs = succs.toArray(new Operator[0]);
plan.add(splitOp);
plan.connect(op, splitOp);
for (Operator suc : sucs) {
// position is remembered in order to maintain the order of the successors
Pair<Integer, Integer> pos = plan.disconnect(op, suc);
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
new ConstantExpression(filterPlan, Boolean.valueOf(true));
LOSplitOutput splitOutput = new LOSplitOutput((LogicalPlan) plan, filterPlan);
splitOutput.setAlias(splitOp.getAlias());
plan.add(splitOutput);
plan.connect(splitOp, splitOutput);
plan.connect(splitOutput, pos.first, suc, pos.second);
}
// Since we adjust the uid layout, clear all cached uids
UidResetter uidResetter = new UidResetter(plan);
uidResetter.visit();
// Manually regenerate schema
SchemaResetter schemaResetter = new SchemaResetter(plan, true);
schemaResetter.visit();
}
}