blob: 72468045160f83647b3ad75384529fdfc8324bb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.test.utils.GenRandomData;
import org.junit.Test;
public class TestPOSort {
Random r = new Random(42L);
int MAX_TUPLES = 10;
@Test
public void testPOSortAscString() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);
poSortAscString( input );
}
@Test
public void testPOSortAscStringWithNull() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
MAX_TUPLES, 100);
poSortAscString( input );
}
public void poSortAscString(DataBag input) throws ExecException {
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr1.setResultType(DataType.CHARARRAY);
PhysicalPlan expPlan = new PhysicalPlan();
expPlan.add(pr1);
sortPlans.add(expPlan);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
//verify
Tuple t = null;
Result res1 = sort.getNextTuple();
Result res2 = sort.getNextTuple();
while (res2.returnStatus != POStatus.STATUS_EOP) {
Object i1 = ((Tuple) res1.result).get(0);
Object i2 = ((Tuple) res2.result).get(0);
//System.out.println("i1: " + i1.toString() + " i2: " + i2.toString());
int i = DataType.compare(i1, i2);
System.out.println("RESULT2=i : " + res2.result + " i = " + i);
assertEquals(true, (i <= 0));
res1 = res2;
res2 = sort.getNextTuple();
}
}
@Test
public void testPOSortDescString() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);
poSortDescString(input);
}
@Test
public void testPOSortDescStringWithNulls() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
MAX_TUPLES, 100);
poSortDescString(input);
}
public void poSortDescString(DataBag input) throws ExecException {
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr1.setResultType(DataType.CHARARRAY);
PhysicalPlan expPlan = new PhysicalPlan();
expPlan.add(pr1);
sortPlans.add(expPlan);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res1 = sort.getNextTuple();
// System.out.println(res1.result);
Result res2 = sort.getNextTuple();
while (res2.returnStatus != POStatus.STATUS_EOP) {
Object i1 = ((Tuple) res1.result).get(0);
Object i2 = ((Tuple) res2.result).get(0);
int i = DataType.compare(i1, i2);
// System.out.println(res2.result + " i = " + i);
assertEquals(true, (i >= 0));
res1 = res2;
res2 = sort.getNextTuple();
}
}
@Test
public void testPOSortAsc() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);
poSortAscInt( input );
}
@Test
public void testPOSortAscWithNulls() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
MAX_TUPLES, 100);
poSortAscInt( input );
}
public void poSortAscInt( DataBag input) throws ExecException {
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan = new PhysicalPlan();
expPlan.add(pr1);
sortPlans.add(expPlan);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res1 = sort.getNextTuple();
// System.out.println(res1.result);
Result res2 = sort.getNextTuple();
while (res2.returnStatus != POStatus.STATUS_EOP) {
Object i1 = ((Tuple) res1.result).get(1);
Object i2 = ((Tuple) res2.result).get(1);
int i = DataType.compare(i1, i2);
assertEquals(true, (i <= 0));
// System.out.println(res2.result);
res1 = res2;
res2 = sort.getNextTuple();
}
}
@Test
public void testPOSortDescInt() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);
poSortDescInt(input );
}
@Test
public void testPOSortDescIntWithNulls() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
MAX_TUPLES, 100);
poSortDescInt(input );
}
public void poSortDescInt(DataBag input) throws ExecException {
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan = new PhysicalPlan();
expPlan.add(pr1);
sortPlans.add(expPlan);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res1 = sort.getNextTuple();
// System.out.println(res1.result);
Result res2 = sort.getNextTuple();
while (res2.returnStatus != POStatus.STATUS_EOP) {
Object i1 = ((Tuple) res1.result).get(1);
Object i2 = ((Tuple) res2.result).get(1);
int i = DataType.compare(i1, i2);
assertEquals(true, (i >= 0));
// System.out.println(res2.result);
res1 = res2;
res2 = sort.getNextTuple();
}
}
/***
* Sorting
* (1, 10)
* (3, 8)
* (2, 8)
*
* BY $1 DESC, $0 ASC
*
* should return
* (1, 10)
* (2 ,8)
* (3, 8)
*
* @throws ExecException
*/
@Test
public void testPOSortMixAscDesc1() throws ExecException {
DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
Tuple t1 = TupleFactory.getInstance().newTuple() ;
t1.append(1);
t1.append(10);
input.add(t1);
Tuple t2 = TupleFactory.getInstance().newTuple() ;
t2.append(3);
t2.append(8);
input.add(t2);
Tuple t3 = TupleFactory.getInstance().newTuple() ;
t3.append(2);
t3.append(8);
input.add(t3);
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan1 = new PhysicalPlan();
expPlan1.add(pr1);
sortPlans.add(expPlan1);
POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr2.setResultType(DataType.INTEGER);
PhysicalPlan expPlan2 = new PhysicalPlan();
expPlan2.add(pr2);
sortPlans.add(expPlan2);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res ;
// output line 1
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 1) ;
assertEquals(((Tuple) res.result).get(1), 10) ;
// output line 2
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 2) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
// output line 3
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 3) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
}
/***
* Sorting
* (null, 10)
* (1, 8)
* (1, null)
* (null,null)
* (3, 8)
*
* BY $1 DESC, $0 ASC
*
* should return
* (null, 10)
* (1, 8 )
* (3, 8 )
* (null,null)
* (1, null)
* @throws ExecException
*/
@Test
public void testPOSortMixAscDesc1WithNull() throws ExecException {
DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
Tuple t1 = TupleFactory.getInstance().newTuple() ;
t1.append(null);
t1.append(10);
input.add(t1);
Tuple t2 = TupleFactory.getInstance().newTuple() ;
t2.append(1);
t2.append(8);
input.add(t2);
Tuple t3 = TupleFactory.getInstance().newTuple() ;
t3.append(1);
t3.append(null);
input.add(t3);
Tuple t4 = TupleFactory.getInstance().newTuple() ;
t4.append(null);
t4.append(null);
input.add(t4);
Tuple t5 = TupleFactory.getInstance().newTuple() ;
t5.append(3);
t5.append(8);
input.add(t5);
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan1 = new PhysicalPlan();
expPlan1.add(pr1);
sortPlans.add(expPlan1);
POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr2.setResultType(DataType.INTEGER);
PhysicalPlan expPlan2 = new PhysicalPlan();
expPlan2.add(pr2);
sortPlans.add(expPlan2);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res ;
// output line 1
res = sort.getNextTuple();
assertNull(((Tuple) res.result).get(0)) ;
assertEquals(((Tuple) res.result).get(1), 10) ;
// output line 2
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 1) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
// output line 3
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 3) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
// output line 4
res = sort.getNextTuple();
assertNull(((Tuple) res.result).get(0)) ;
assertNull(((Tuple) res.result).get(1)) ;
// output line 5
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 1 );
assertNull(((Tuple) res.result).get(1)) ;
}
/***
* Sorting
* (1, 2)
* (3, 5)
* (3, 8)
*
* BY $0 DESC, $1 ASC
*
* should return
* (3, 5)
* (3 ,8)
* (1, 2)
*
* @throws ExecException
*/
@Test
public void testPOSortMixAscDesc2() throws ExecException {
DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
Tuple t1 = TupleFactory.getInstance().newTuple() ;
t1.append(1);
t1.append(2);
input.add(t1);
Tuple t2 = TupleFactory.getInstance().newTuple() ;
t2.append(3);
t2.append(5);
input.add(t2);
Tuple t3 = TupleFactory.getInstance().newTuple() ;
t3.append(3);
t3.append(8);
input.add(t3);
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan1 = new PhysicalPlan();
expPlan1.add(pr1);
sortPlans.add(expPlan1);
POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr2.setResultType(DataType.INTEGER);
PhysicalPlan expPlan2 = new PhysicalPlan();
expPlan2.add(pr2);
sortPlans.add(expPlan2);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res ;
// output line 1
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 3) ;
assertEquals(((Tuple) res.result).get(1), 5) ;
// output line 2
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 3) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
// output line 3
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 1) ;
assertEquals(((Tuple) res.result).get(1), 2) ;
}
/***
* Sorting
* (null, 10)
* (1, 8)
* (1, null)
* (null,null)
* (3, 8)
*
* BY $0 DESC, $1 ASC
*
* should return
* (3, 8 )
* (1, null)
* (1, 8 )
* (null,null)
* (null, 10)
* @throws ExecException
*/
@Test
public void testPOSortMixAscDesc2Null() throws ExecException {
DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
Tuple t1 = TupleFactory.getInstance().newTuple() ;
t1.append(null);
t1.append(10);
input.add(t1);
Tuple t2 = TupleFactory.getInstance().newTuple() ;
t2.append(1);
t2.append(8);
input.add(t2);
Tuple t3 = TupleFactory.getInstance().newTuple() ;
t3.append(1);
t3.append(null);
input.add(t3);
Tuple t4 = TupleFactory.getInstance().newTuple() ;
t4.append(null);
t4.append(null);
input.add(t4);
Tuple t5 = TupleFactory.getInstance().newTuple() ;
t5.append(3);
t5.append(8);
input.add(t5);
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan1 = new PhysicalPlan();
expPlan1.add(pr1);
sortPlans.add(expPlan1);
POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr2.setResultType(DataType.INTEGER);
PhysicalPlan expPlan2 = new PhysicalPlan();
expPlan2.add(pr2);
sortPlans.add(expPlan2);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
Tuple t = null;
Result res ;
// output line 1
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 3) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
// output line 2
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 1) ;
assertNull(((Tuple) res.result).get(1)) ;
// output line 3
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 1) ;
assertEquals(((Tuple) res.result).get(1), 8) ;
// output line 4
res = sort.getNextTuple();
assertNull(((Tuple) res.result).get(0)) ;
assertNull(((Tuple) res.result).get(1)) ;
// output line 5
res = sort.getNextTuple();
assertNull(((Tuple) res.result).get(0)) ;
assertEquals(((Tuple) res.result).get(1), 10) ;
}
@Test
public void testPOSortWithLimit() throws ExecException {
DataBag input = DefaultBagFactory.getInstance().newDefaultBag();
Tuple t1 = TupleFactory.getInstance().newTuple() ;
t1.append(1);
t1.append(8);
input.add(t1);
Tuple t2 = TupleFactory.getInstance().newTuple() ;
t2.append(2);
t2.append(8);
input.add(t2);
Tuple t3 = TupleFactory.getInstance().newTuple() ;
t3.append(3);
t3.append(8);
input.add(t3);
List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
pr1.setResultType(DataType.INTEGER);
PhysicalPlan expPlan1 = new PhysicalPlan();
expPlan1.add(pr1);
sortPlans.add(expPlan1);
POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
pr2.setResultType(DataType.INTEGER);
PhysicalPlan expPlan2 = new PhysicalPlan();
expPlan2.add(pr2);
sortPlans.add(expPlan2);
List<Boolean> mAscCols = new LinkedList<Boolean>();
mAscCols.add(false);
mAscCols.add(true);
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
sortPlans, mAscCols, null);
sort.setLimit(1);
Result res;
res = sort.getNextTuple();
assertEquals(((Tuple) res.result).get(0), 3);
assertEquals(((Tuple) res.result).get(1), 8);
res = sort.getNextTuple();
assertEquals(res.returnStatus, POStatus.STATUS_EOP);
}
@Test
public void testPOSortUDF() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);
poSortUDFWithNull(input);
}
@Test
public void testPOSortUDFWithNull() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
MAX_TUPLES, 100);
poSortUDFWithNull(input);
}
public void poSortUDFWithNull(DataBag input) throws ExecException {
PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
inputs.add(read);
String funcName = WeirdComparator.class.getName() + "()";
/*POUserFunc comparator = new POUserFunc(
new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
POUserComparisonFunc comparator = new POUserComparisonFunc(
new OperatorKey("", r.nextLong()), -1, null, new FuncSpec(funcName));
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
null, null, comparator);
Tuple t = null;
Result res1 = sort.getNextTuple();
// System.out.println(res1.result);
Result res2 = sort.getNextTuple();
while (res2.returnStatus != POStatus.STATUS_EOP) {
int i1 = ((Integer) ((Tuple) res1.result).get(1) == null ? 0 : (Integer) ((Tuple) res1.result).get(1));
int i2 = ((Integer) ((Tuple) res2.result).get(1) == null ? 0 : (Integer) ((Tuple) res2.result).get(1));
int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
assertEquals(true, (i <= 0));
System.out.println(i + " : " + res2.result);
res1 = res2;
res2 = sort.getNextTuple();
}
}
// sorts values in ascending order of their distance from 50
public static class WeirdComparator extends ComparisonFunc {
@Override
public int compare(Tuple t1, Tuple t2) {
// TODO Auto-generated method stub
int result = 0;
try {
int i1 = ((Integer) t1.get(1) == null ? 0 : (Integer)t1.get(1));
int i2 = ((Integer) t2.get(1) == null ? 0 : (Integer)t2.get(1));
result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
} catch (ExecException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
}
}
}