blob: c755f88d8af5afb102ff709da88bd5a9d30db6c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test.utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.test.PORead;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.Utils;
public class GenPhyOp{
static Random r = new Random();
public static final byte GTE = 1;
public static final byte GT = 2;
public static final byte LTE = 3;
public static final byte LT = 4;
public static class PlansAndFlattens {
public List<PhysicalPlan> plans;
public List<Boolean> flattens;
public PlansAndFlattens(List<PhysicalPlan> p, List<Boolean> f) {
plans = p;
flattens = f;
}
};
public static PigContext pc;
public static ConstantExpression exprConst() {
ConstantExpression ret = new ConstantExpression(new OperatorKey("", r
.nextLong()));
return ret;
}
public static GreaterThanExpr compGreaterThanExpr() {
GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("", r
.nextLong()));
return ret;
}
public static GreaterThanExpr compGreaterThanExpr(
ExpressionOperator lhs,
ExpressionOperator rhs,
byte type) {
GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("", r
.nextLong()));
ret.setLhs(lhs);
ret.setRhs(rhs);
ret.setOperandType(type);
return ret;
}
public static POAnd compAndExpr(
ExpressionOperator lhs,
ExpressionOperator rhs) {
POAnd ret = new POAnd(new OperatorKey("", r
.nextLong()));
ret.setLhs(lhs);
ret.setRhs(rhs);
ret.setOperandType(DataType.BOOLEAN);
return ret;
}
public static POProject exprProject() {
POProject ret = new POProject(new OperatorKey("", r.nextLong()));
return ret;
}
public static POProject exprProject(int col) {
POProject ret = new POProject(new OperatorKey("", r.nextLong()), 1, col);
return ret;
}
public static GTOrEqualToExpr compGTOrEqualToExpr() {
GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("", r
.nextLong()));
return ret;
}
public static EqualToExpr compEqualToExpr() {
EqualToExpr ret = new EqualToExpr(new OperatorKey("", r.nextLong()));
return ret;
}
public static EqualToExpr compEqualToExpr(
ExpressionOperator lhs,
ExpressionOperator rhs,
byte type) {
EqualToExpr ret = new EqualToExpr(new OperatorKey("", r
.nextLong()));
ret.setLhs(lhs);
ret.setRhs(rhs);
ret.setOperandType(type);
return ret;
}
public static NotEqualToExpr compNotEqualToExpr() {
NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("", r
.nextLong()));
return ret;
}
public static POIsNull compIsNullExpr() {
POIsNull ret = new POIsNull(new OperatorKey("", r
.nextLong()));
return ret;
}
public static LessThanExpr compLessThanExpr() {
LessThanExpr ret = new LessThanExpr(new OperatorKey("", r.nextLong()));
return ret;
}
public static LTOrEqualToExpr compLTOrEqualToExpr() {
LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("", r
.nextLong()));
return ret;
}
public static POLocalRearrange topLocalRearrangeOp() {
POLocalRearrange ret = new POLocalRearrange(new OperatorKey("", r
.nextLong()));
List<PhysicalPlan> plans = new LinkedList<PhysicalPlan>();
try {
ret.setPlans(plans);
} catch(PlanException pe) {
}
return ret;
}
public static POGlobalRearrange topGlobalRearrangeOp(){
POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("", r
.nextLong()));
return ret;
}
public static POPackage topPackageOp(){
POPackage ret = new POPackage(new OperatorKey("", r.nextLong()));
return ret;
}
public static POForEach topForEachOp() {
POForEach ret = new POForEach(new OperatorKey("", r
.nextLong()));
return ret;
}
public static POUnion topUnionOp() {
POUnion ret = new POUnion(new OperatorKey("", r.nextLong()));
return ret;
}
public static POPartialAgg topPOPartialAgg(){
POPartialAgg partAgg = new POPartialAgg(getOK());
return partAgg;
}
/**
* creates the PlansAndFlattens struct for
* generate grpCol, *.
*
* @param grpCol - The column to be grouped on
* @param sample - The sample tuple that is used to infer
* result types and #projects for *
* @return - The PlansAndFlattens struct which has the exprplan
* for generate grpCol, * set.
* @throws ExecException
*/
public static PlansAndFlattens topGenerateOpWithExPlan(
int grpCol,
Tuple sample) throws ExecException {
POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
prj1.setResultType(sample.getType(grpCol));
prj1.setOverloaded(false);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(false);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
POProject rest[] = new POProject[sample.size()];
int i=-1;
for (POProject project : rest) {
project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
project.setResultType(sample.getType(i));
project.setOverloaded(false);
PhysicalPlan pl = new PhysicalPlan();
pl.add(project);
toBeFlattened.add(false);
inputs.add(pl);
}
return new PlansAndFlattens(inputs, toBeFlattened);
}
/**
* creates the PlansAndFlattens struct for
* generate grpCol, *.
*
* @param grpCol - The column to be grouped on
* @param sample - The sample tuple that is used to infer
* result types and #projects for *
* @return - The PlansAndFlattens struct which has the exprplan
* for generate grpCol, * set.
* @throws ExecException
* @throws PlanException
*/
public static PlansAndFlattens topGenerateOpWithExPlanLR(
int grpCol,
Tuple sample) throws ExecException, PlanException {
POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
prj1.setResultType(sample.getType(grpCol));
prj1.setOverloaded(false);
POCastDummy cst = new POCastDummy(new OperatorKey("",r.nextLong()));
cst.setResultType(sample.getType(grpCol));
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(false);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
plan1.add(cst);
plan1.connect(prj1, cst);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
POProject rest[] = new POProject[sample.size()];
POCastDummy csts[] = new POCastDummy[sample.size()];
int i=-1;
for (POProject project : rest) {
project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
project.setResultType(sample.getType(i));
project.setOverloaded(false);
csts[i] = new POCastDummy(new OperatorKey("",r.nextLong()));
csts[i].setResultType(sample.getType(i));
PhysicalPlan pl = new PhysicalPlan();
pl.add(project);
pl.add(csts[i]);
pl.connect(project, csts[i]);
toBeFlattened.add(false);
inputs.add(pl);
}
return new PlansAndFlattens(inputs, toBeFlattened);
}
/**
* creates the PlansAndFlattens struct for
* 'generate field'.
*
* @param field - The column to be generated
* @param sample - The sample tuple that is used to infer
* result type
* @return - The PlansAndFlattens struct which has the exprplan
* for 'generate field' set.
* @throws ExecException
*/
public static PlansAndFlattens topGenerateOpWithExPlanForFe(
int field,
Tuple sample) throws ExecException {
POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field);
prj1.setResultType(sample.getType(field));
prj1.setOverloaded(false);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(false);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
return new PlansAndFlattens(inputs, toBeFlattened);
}
/**
* creates the PlansAndFlattens struct for
* 'generate flatten(field)'.
*
* @param field - The column to be generated
* @param sample - The sample tuple that is used to infer
* result type
* @return - The PlansAndFlattens struct which has the exprplan
* for 'generate field' set.
* @throws ExecException
*/
public static PlansAndFlattens topGenerateOpWithExPlanForFeFlat(int field) throws ExecException {
POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field);
prj1.setResultType(DataType.BAG);
prj1.setOverloaded(false);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(true);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
return new PlansAndFlattens(inputs, toBeFlattened);
}
/**
* creates the PlansAndFlattens struct for
* 'generate field[0] field[1] ...'.
*
* @param fields - The columns to be generated
* @param sample - The sample tuple that is used to infer
* result type
* @return - The PlansAndFlattens struct which has the exprplan
* for 'generate field[0] field[1]' set.
* @throws ExecException
* @throws PlanException
*/
public static PlansAndFlattens topGenerateOpWithExPlanForFe(
int[] fields,
Tuple sample) throws ExecException, PlanException {
POProject[] prj = new POProject[fields.length];
for(int i=0;i<prj.length;i++){
prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]);
prj[i].setResultType(sample.getType(fields[i]));
prj[i].setOverloaded(false);
}
POCastDummy[] cst = new POCastDummy[fields.length];
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
for (POProject project : prj)
toBeFlattened.add(false);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
PhysicalPlan[] plans = new PhysicalPlan[fields.length];
for (int i=0;i<plans.length;i++) {
plans[i] = new PhysicalPlan();
plans[i].add(prj[i]);
cst[i] = new POCastDummy(new OperatorKey("",r.nextLong()));
cst[i].setResultType(sample.getType(fields[i]));
plans[i].add(cst[i]);
plans[i].connect(prj[i], cst[i]);
inputs.add(plans[i]);
}
return new PlansAndFlattens(inputs, toBeFlattened);
}
/**
* creates the PlansAndFlattens struct for
* 'generate field[0] field[1] ...'.
* with the flatten list as specified
* @param fields - The columns to be generated
* @param toBeFlattened - The columns to be flattened
* @param sample - The sample tuple that is used to infer
* result type
* @return - The PlansAndFlattens struct which has the exprplan
* for 'generate field[0] field[1]' set.
* @throws ExecException
* @throws PlanException
*/
public static PlansAndFlattens topGenerateOpWithExPlanForFe(
int[] fields,
Tuple sample,
List<Boolean> toBeFlattened) throws ExecException, PlanException {
POProject[] prj = new POProject[fields.length];
for(int i=0;i<prj.length;i++){
prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]);
prj[i].setResultType(sample.getType(fields[i]));
prj[i].setOverloaded(false);
}
POCastDummy[] cst = new POCastDummy[fields.length];
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
PhysicalPlan[] plans = new PhysicalPlan[fields.length];
for (int i=0;i<plans.length;i++) {
plans[i] = new PhysicalPlan();
plans[i].add(prj[i]);
cst[i] = new POCastDummy(new OperatorKey("",r.nextLong()));
cst[i].setResultType(sample.getType(fields[i]));
plans[i].add(cst[i]);
plans[i].connect(prj[i], cst[i]);
inputs.add(plans[i]);
}
return new PlansAndFlattens(inputs, toBeFlattened);
}
/**
* creates the POLocalRearrange operator with the given index for
* group by grpCol
* @param index - The input index of this POLocalRearrange operator
* @param grpCol - The column to be grouped on
* @param sample - Sample tuple needed for topGenerateOpWithExPlan
* @return - The POLocalRearrange operator
* @throws ExecException
*/
public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException, PlanException{
POLocalRearrange lr = topLocalRearrangeOPWithPlanPlain(index, grpCol, sample);
List<PhysicalPlan> plans = lr.getPlans();
PhysicalPlan ep = plans.get(0);
POCastDummy cst = new POCastDummy(new OperatorKey("", r.nextLong()));
cst.setResultType(sample.getType(grpCol));
ep.addAsLeaf(cst);
lr.setPlans(plans);
return lr;
}
/**
* creates the POLocalRearrange operator with the given index for
* group by grpCol
* @param index - The input index of this POLocalRearrange operator
* @param grpCol - The column to be grouped on
* @param sample - Sample tuple needed for topGenerateOpWithExPlan
* @return - The POLocalRearrange operator
* @throws ExecException
*/
public static POLocalRearrange topLocalRearrangeOPWithPlanPlain(int index, int grpCol, Tuple sample) throws ExecException, PlanException{
POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
prj1.setResultType(sample.getType(grpCol));
prj1.setOverloaded(false);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>();
plans.add(plan1);
POLocalRearrange ret = topLocalRearrangeOp();
ret.setPlans(plans);
ret.setIndex(index);
ret.setResultType(DataType.TUPLE);
ret.setKeyType(sample.getType(grpCol));
return ret;
}
/**
* creates the POForEach operator for
* foreach A generate field
* @param field - The column to be generated
* @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
* @return - The POForEach operator
* @throws ExecException
*/
public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException, PlanException{
PlansAndFlattens pf = topGenerateOpWithExPlanForFe(field, sample);
POForEach ret = topForEachOp();
ret.setInputPlans(pf.plans);
ret.setToBeFlattened(pf.flattens);
ret.setResultType(DataType.TUPLE);
return ret;
}
/**
* creates the POForEach operator for
* foreach A generate field[0] field[1]
* @param fields - The columns to be generated
* @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
* @return - The POForEach operator
* @throws ExecException
*/
public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample) throws ExecException, PlanException{
PlansAndFlattens pf = topGenerateOpWithExPlanForFe(fields, sample);
POForEach ret = topForEachOp();
ret.setInputPlans(pf.plans);
ret.setToBeFlattened(pf.flattens);
ret.setResultType(DataType.TUPLE);
return ret;
}
/**
* creates the POForEach operator for
* foreach A generate field[0] field[1]
* @param fields - The columns to be generated
* @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
* @return - The POForEach operator
* @throws ExecException
*/
public static POForEach topForEachOPWithPlan(
int[] fields,
Tuple sample,
List<Boolean> toBeFlattened) throws ExecException, PlanException{
PlansAndFlattens pf =
topGenerateOpWithExPlanForFe(fields, sample, toBeFlattened);
POForEach ret = topForEachOp();
ret.setInputPlans(pf.plans);
ret.setToBeFlattened(toBeFlattened);
ret.setResultType(DataType.TUPLE);
return ret;
}
/**
* creates the POForEach operator for
* foreach A generate flatten(field)
* @param fields - The columns to be generated
* @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
* @return - The POForEach operator
* @throws ExecException
*/
public static POForEach topForEachOPWithPlan(int field) throws ExecException, PlanException{
PlansAndFlattens pf = topGenerateOpWithExPlanForFeFlat(field);
POForEach ret = topForEachOp();
ret.setInputPlans(pf.plans);
ret.setToBeFlattened(pf.flattens);
ret.setResultType(DataType.TUPLE);
return ret;
}
public static POLoad topLoadOp() {
POLoad ret = new POLoad(new OperatorKey("", r.nextLong()));
ret.setPc(pc);
return ret;
}
public static POFilter topFilterOp() {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
ConstantExpression ex = GenPhyOp.exprConst();
ex.setValue(new Boolean(true));
PhysicalPlan pp = new PhysicalPlan();
pp.add(ex);
ret.setPlan(pp);
return ret;
}
public static POFilter connectedFilterOp(PhysicalOperator input) {
List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(1);
ops.add(input);
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()), ops);
return ret;
}
public static POLimit topLimitOp() {
POLimit ret = new POLimit(new OperatorKey("", r.nextLong()));
return ret;
}
public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal)
throws ExecException, PlanException {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
ConstantExpression ce1 = GenPhyOp.exprConst();
ce1.setValue(lhsVal);
ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
gr.setLhs(ce1);
gr.setRhs(ce2);
gr.setOperandType(DataType.INTEGER);
PhysicalPlan ep = new PhysicalPlan();
ep.add(ce1);
ep.add(ce2);
ep.add(gr);
ep.connect(ce1, gr);
ep.connect(ce2, gr);
ret.setPlan(ep);
return ret;
}
public static POFilter topFilterOpWithProj(int col, int rhsVal)
throws ExecException, PlanException {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
POProject proj = exprProject();
proj.setResultType(DataType.INTEGER);
proj.setColumn(col);
proj.setOverloaded(false);
ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
gr.setLhs(proj);
gr.setRhs(ce2);
gr.setOperandType(DataType.INTEGER);
PhysicalPlan ep = new PhysicalPlan();
ep.add(proj);
ep.add(ce2);
ep.add(gr);
ep.connect(proj, gr);
ep.connect(ce2, gr);
ret.setPlan(ep);
return ret;
}
public static POFilter topFilterOpWithProj(int col, int rhsVal,
byte CompType) throws ExecException, PlanException {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
POProject proj = exprProject();
proj.setResultType(DataType.INTEGER);
proj.setColumn(col);
proj.setOverloaded(false);
ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
BinaryComparisonOperator cop = null;
switch (CompType) {
case GenPhyOp.GTE:
cop = GenPhyOp.compGTOrEqualToExpr();
break;
case GenPhyOp.GT:
cop = GenPhyOp.compGreaterThanExpr();
break;
case GenPhyOp.LTE:
cop = GenPhyOp.compLTOrEqualToExpr();
break;
case GenPhyOp.LT:
cop = GenPhyOp.compLessThanExpr();
break;
}
cop.setLhs(proj);
cop.setRhs(ce2);
cop.setOperandType(DataType.INTEGER);
PhysicalPlan ep = new PhysicalPlan();
ep.add(proj);
ep.add(ce2);
ep.add(cop);
ep.connect(proj, cop);
ep.connect(ce2, cop);
ret.setPlan(ep);
return ret;
}
public static POFilter topFilterOpWithProjWithCast(int col, int rhsVal, byte CompType)
throws ExecException, PlanException {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
POProject proj = exprProject();
proj.setResultType(DataType.INTEGER);
proj.setColumn(col);
proj.setOverloaded(false);
ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
BinaryComparisonOperator cop = null;
switch(CompType){
case GenPhyOp.GTE:
cop = GenPhyOp.compGTOrEqualToExpr();
break;
case GenPhyOp.GT:
cop = GenPhyOp.compGreaterThanExpr();
break;
case GenPhyOp.LTE:
cop = GenPhyOp.compLTOrEqualToExpr();
break;
case GenPhyOp.LT:
cop = GenPhyOp.compLessThanExpr();
break;
}
POCastDummy cst = new POCastDummy(new OperatorKey("",r.nextLong()));
cop.setLhs(cst);
cop.setRhs(ce2);
cop.setOperandType(DataType.INTEGER);
PhysicalPlan ep = new PhysicalPlan();
ep.add(cst);
ep.add(proj);
ep.add(ce2);
ep.add(cop);
ep.connect(proj, cst);
ep.connect(cst, cop);
ep.connect(ce2, cop);
ret.setPlan(ep);
return ret;
}
public static PORead topReadOp(DataBag bag) {
PORead ret = new PORead(new OperatorKey("", r.nextLong()), bag);
return ret;
}
public static POStore dummyPigStorageOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));
ret.setSFile(new FileSpec("DummyFil", new FuncSpec(PigStorage.class.getName() + "()")));
return ret;
}
public static POStore topStoreOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));
return ret;
}
public static void setR(Random r) {
GenPhyOp.r = r;
}
public static MapReduceOper MROp(){
MapReduceOper ret = new MapReduceOper(new OperatorKey("",r.nextLong()));
return ret;
}
private static FileSpec getTempFileSpec() throws IOException {
return new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),
new FuncSpec(Utils.getTmpFileCompressorName(pc))
);
}
public static POSplit topSplitOp() throws IOException{
POSplit ret = new POSplit(new OperatorKey("",r.nextLong()));
ret.setSplitStore(getTempFileSpec());
return ret;
}
public static PhysicalPlan grpChain() throws ExecException, PlanException{
PhysicalPlan grpChain = new PhysicalPlan();
POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
POPackage pk = GenPhyOp.topPackageOp();
grpChain.add(lr);
grpChain.add(gr);
grpChain.add(pk);
grpChain.connect(lr, gr);
grpChain.connect(gr, pk);
return grpChain;
}
public static PhysicalPlan loadedGrpChain() throws ExecException, PlanException{
PhysicalPlan ret = new PhysicalPlan();
POLoad ld = GenPhyOp.topLoadOp();
POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
POPackage pk = GenPhyOp.topPackageOp();
ret.add(ld);
ret.add(lr);
ret.add(gr);
ret.add(pk);
ret.connect(ld, lr);
ret.connect(lr, gr);
ret.connect(gr, pk);
return ret;
}
public static PhysicalPlan loadedFilter() throws ExecException, PlanException{
PhysicalPlan ret = new PhysicalPlan();
POLoad ld = GenPhyOp.topLoadOp();
POFilter fl = GenPhyOp.topFilterOp();
ret.add(ld);
ret.add(fl);
ret.connect(ld, fl);
return ret;
}
public static POForEach topForEachOPWithUDF(List<String> clsName) throws PlanException{
List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
List<Boolean> flattened3 = new ArrayList<Boolean>();
for (String string : clsName) {
PhysicalPlan ep4 = new PhysicalPlan();
POProject prjStar4 = new POProject(new OperatorKey("", r.nextLong()));
prjStar4.setResultType(DataType.TUPLE);
prjStar4.setStar(true);
ep4.add(prjStar4);
List ufInps = new ArrayList();
ufInps.add(prjStar4);
POUserFunc uf = new POUserFunc(new OperatorKey("", r.nextLong()), -1, ufInps, new FuncSpec(string));
ep4.add(uf);
ep4.connect(prjStar4, uf);
ep4s.add(ep4);
flattened3.add(false);
}
POForEach fe3 = new POForEach(new OperatorKey("", r.nextLong()), 1,
ep4s, flattened3);
fe3.setResultType(DataType.TUPLE);
return fe3;
}
public static PhysicalPlan arithPlan() throws PlanException{
PhysicalPlan ep = new PhysicalPlan();
ConstantExpression ce[] = new ConstantExpression[7];
for(int i=0;i<ce.length;i++){
ce[i] = GenPhyOp.exprConst();
ce[i].setValue(i);
ep.add(ce[i]);
}
Add ad = new Add(getOK());
ep.add(ad);
ep.connect(ce[0], ad);
ep.connect(ce[1], ad);
Divide div = new Divide(getOK());
ep.add(div);
ep.connect(ce[2], div);
ep.connect(ce[3], div);
Subtract sub = new Subtract(getOK());
ep.add(sub);
ep.connect(ad, sub);
ep.connect(div, sub);
Mod mod = new Mod(getOK());
ep.add(mod);
ep.connect(ce[4], mod);
ep.connect(ce[5], mod);
Multiply mul1 = new Multiply(getOK());
ep.add(mul1);
ep.connect(mod, mul1);
ep.connect(ce[6], mul1);
Multiply mul2 = new Multiply(getOK());
ep.add(mul2);
ep.connect(sub, mul2);
ep.connect(mul1, mul2);
return ep;
}
public static OperatorKey getOK(){
return new OperatorKey("",r.nextLong());
}
public static void setPc(PigContext pc) {
GenPhyOp.pc = pc;
}
}