blob: c01e342b1b0c2a5c59eff0b46a9c3ad08cf6dc78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.relational;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.FuncSpec;
import org.apache.pig.SortColInfo;
import org.apache.pig.SortColInfo.Order;
import org.apache.pig.SortInfo;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
public class LOSort extends LogicalRelationalOperator{
private List<Boolean> mAscCols;
private FuncSpec mSortFunc;
private boolean mIsStar = false;
private long limit = -1;
private List<LogicalExpressionPlan> mSortColPlans;
public LOSort(OperatorPlan plan) {
super("LOSort", plan);
}
public LOSort(OperatorPlan plan, List<LogicalExpressionPlan> sortColPlans,
List<Boolean> ascCols,
FuncSpec sortFunc ) {
this( plan );
mSortColPlans = sortColPlans;
mAscCols = ascCols;
mSortFunc = sortFunc;
}
public List<LogicalExpressionPlan> getSortColPlans() {
return mSortColPlans;
}
public void setSortColPlans(List<LogicalExpressionPlan> sortPlans) {
mSortColPlans = sortPlans;
}
public List<Boolean> getAscendingCols() {
return mAscCols;
}
public void setAscendingCols(List<Boolean> ascCols) {
mAscCols = ascCols;
}
public FuncSpec getUserFunc() {
return mSortFunc;
}
public void setUserFunc(FuncSpec func) {
mSortFunc = func;
}
public boolean isStar() {
return mIsStar;
}
public void setStar(boolean b) {
mIsStar = b;
}
public void setLimit(long l)
{
limit = l;
}
public long getLimit()
{
return limit;
}
public boolean isLimited()
{
return (limit!=-1);
}
@Override
public LogicalSchema getSchema() throws FrontendException {
if (schema!=null)
return schema;
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
schema = input.getSchema();
return schema;
}
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
throw new FrontendException("Expected LogicalPlanVisitor", 2223);
}
((LogicalRelationalNodesVisitor)v).visit(this);
}
public SortInfo getSortInfo() throws FrontendException {
LogicalSchema schema = this.getSchema();
List<SortColInfo> sortColInfoList = new ArrayList<SortColInfo>();
for (int i = 0; i < mSortColPlans.size(); i++) {
//get the single project from the sort plans
LogicalExpressionPlan lp = mSortColPlans.get(i);
Iterator<Operator> opsIterator = lp.getOperators();
List<Operator> opsList = new ArrayList<Operator>();
while(opsIterator.hasNext()) {
opsList.add(opsIterator.next());
}
if(opsList.size() != 1 || !(opsList.get(0) instanceof ProjectExpression)) {
throw new FrontendException(this, "Unsupported operator in inner plan: " + opsList.get(0), 2237);
}
ProjectExpression project = (ProjectExpression) opsList.get(0);
//create SortColInfo from the project
if(project.isProjectStar()){
//there is no input schema, that is why project-star is still here
// we don't know how many columns are represented by this
//so don't add further columns to sort list
return new SortInfo(sortColInfoList);
}
if(project.isRangeProject()){
if(project.getEndCol() < 0){
//stop here for
// same reason as project-star condition above
//(unkown number of columns this represents)
return new SortInfo(sortColInfoList);
}
//expand the project-range into multiple SortColInfos
for(int cnum = project.getStartCol(); cnum < project.getEndCol(); cnum++){
sortColInfoList.add(
new SortColInfo(null, cnum, getOrder(mAscCols,i))
);
}
}
else{
int sortColIndex = project.getColNum();
String sortColName = (schema == null) ? null :
schema.getField(sortColIndex).alias;
sortColInfoList.add(
new SortColInfo(sortColName, sortColIndex, getOrder(mAscCols,i))
);
}
}
return new SortInfo(sortColInfoList);
}
private Order getOrder(List<Boolean> mAscCols2, int i) {
return mAscCols.get(i) ?
SortColInfo.Order.ASCENDING : SortColInfo.Order.DESCENDING;
}
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof LOSort) {
LOSort otherSort = (LOSort)other;
if (!mAscCols.equals(otherSort.getAscendingCols()))
return false;
if (!mSortFunc.equals(otherSort.getUserFunc()))
return false;
if (mIsStar!=otherSort.isStar())
return false;
if (limit!=otherSort.getLimit())
return false;
if (!mSortColPlans.equals(otherSort.getSortColPlans()))
return false;
}
return checkEquality((LogicalRelationalOperator)other);
}
public Operator getInput(LogicalPlan plan) {
return plan.getPredecessors(this).get(0);
}
private static class ResetProjectionAttachedRelationalOpVisitor
extends LogicalExpressionVisitor {
private LogicalRelationalOperator attachedRelationalOp;
ResetProjectionAttachedRelationalOpVisitor (
LogicalExpressionPlan plan, LogicalRelationalOperator op )
throws FrontendException {
super(plan, new ReverseDependencyOrderWalker(plan));
this.attachedRelationalOp = op;
}
@Override
public void visit(ProjectExpression pe) throws FrontendException {
pe.setAttachedRelationalOp(attachedRelationalOp);
}
}
public static LOSort createCopy(LOSort sort) throws FrontendException {
LOSort newSort = new LOSort(sort.getPlan(), null,
sort.getAscendingCols(),
sort.getUserFunc());
List<LogicalExpressionPlan> newSortColPlans =
new ArrayList<LogicalExpressionPlan>(sort.getSortColPlans().size());
for(LogicalExpressionPlan lep:sort.getSortColPlans() ) {
LogicalExpressionPlan new_lep = lep.deepCopy();
// Resetting the attached LOSort operator of the ProjectExpression
// to the newSort
new ResetProjectionAttachedRelationalOpVisitor(
new_lep, newSort ).visit();
newSortColPlans.add(new_lep);
}
newSort.setSortColPlans(newSortColPlans);
return newSort;
}
}