| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import java.util.Iterator; |
| import java.util.Random; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.PrintStream; |
| |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.DefaultBagFactory; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr; |
| import org.apache.pig.impl.plan.PlanException; |
| import org.apache.pig.test.utils.GenPhyOp; |
| import org.joda.time.DateTime; |
| import org.junit.Before; |
| |
| import junit.framework.TestCase; |
| |
| public class TestPOBinCond extends TestCase { |
| Random r = new Random(); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| DataBag bagDefault = BagFactory.getInstance().newDefaultBag(); |
| DataBag bagWithNull = BagFactory.getInstance().newDefaultBag(); |
| DataBag bagWithBoolean = BagFactory.getInstance().newDefaultBag(); |
| DataBag bagWithBooleanAndNull = BagFactory.getInstance().newDefaultBag(); |
| |
| final int MAX = 10; |
| |
| /*** |
| * POBinCondition tests |
| * |
| * (r1, 1, 0 ) |
| * (r2, 1, 0 ) |
| * (r3, 1, 0 ) |
| * ... |
| * (rn, 1, 0 ) |
| * |
| * where r is a random number ( r1 .. rn ) |
| * |
| * The POBinCondition to test is: Integer(result)= ( r == 1 )? Ingeger(1), Ingeger(0); |
| * but the condition can be of any datatype: Interger, Float, Double... |
| * |
| * @throws ExecException |
| */ |
| |
| @Before |
| @Override |
| public void setUp() { |
| |
| //default bag as shown above |
| for(int i = 0; i < 10; i ++) { |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| t.append(r.nextInt(2)); |
| t.append(1); |
| t.append(0); |
| bagDefault.add(t); |
| } |
| |
| //same as default bag but contains nulls |
| for(int i = 0; i < 10; i ++) { |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| if (r.nextInt(4)%3 == 0){ |
| t.append(null); |
| |
| }else{ |
| t.append(r.nextInt(2)); |
| } |
| t.append(1); |
| t.append(0); |
| bagWithNull.add(t); |
| |
| } |
| |
| //r is a boolean |
| for(int i = 0; i < 10; i ++) { |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| if (r.nextInt(2)%2 == 0 ){ |
| t.append(true); |
| } else { |
| t.append(false); |
| } |
| t.append(1); |
| t.append(0); |
| bagWithBoolean.add(t); |
| |
| } |
| |
| //r is a boolean with nulls |
| for(int i = 0; i < 10; i ++) { |
| |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| if (r.nextInt(3)%2 == 0){ |
| |
| t.append(null); |
| |
| }else{ |
| |
| if (r.nextInt(2)%2 == 0 ){ |
| t.append(true); |
| } else { |
| t.append(false); |
| } |
| |
| } |
| t.append(1); |
| t.append(0); |
| bagWithBooleanAndNull.add(t); |
| |
| } |
| |
| |
| } |
| |
| /* ORIGINAL TEST |
| public void testPOBinCond() throws ExecException, PlanException { |
| ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst(); |
| rt.setValue(1); |
| rt.setResultType(DataType.INTEGER); |
| |
| POProject prj1 = GenPhyOp.exprProject(); |
| prj1.setColumn(0); |
| prj1.setResultType(DataType.INTEGER); |
| |
| EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr(); |
| equal.setLhs(prj1); |
| equal.setRhs(rt); |
| equal.setOperandType(DataType.INTEGER); |
| |
| POProject prjLhs = GenPhyOp.exprProject(); |
| prjLhs.setResultType(DataType.INTEGER); |
| prjLhs.setColumn(1); |
| |
| POProject prjRhs = GenPhyOp.exprProject(); |
| prjRhs.setResultType(DataType.INTEGER); |
| prjRhs.setColumn(2); |
| |
| POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs); |
| op.setResultType(DataType.INTEGER); |
| |
| PhysicalPlan plan = new PhysicalPlan(); |
| plan.add(op); |
| plan.add(prjLhs); |
| plan.add(prjRhs); |
| plan.add(equal); |
| plan.connect(equal, op); |
| plan.connect(prjLhs, op); |
| plan.connect(prjRhs, op); |
| |
| plan.add(prj1); |
| plan.add(rt); |
| plan.connect(prj1, equal); |
| plan.connect(rt, equal); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| plan.attachInput(t); |
| Integer i = (Integer) t.get(0); |
| assertEquals(1, i | (Integer)op.getNext(i).result); |
| // System.out.println(t + " " + op.getNext(i).result.toString()); |
| } |
| |
| |
| } |
| */ |
| |
| public void testPOBinCondWithBoolean() throws ExecException, PlanException { |
| bag = getBag(DataType.BOOLEAN); |
| TestPoBinCondHelper testHelper = new TestPoBinCondHelper( |
| DataType.BOOLEAN, Boolean.TRUE); |
| |
| for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| Boolean value = (Boolean) t.get(0); |
| int expected = (value.booleanValue() == true) ? 1 : 0; |
| Integer dummy = new Integer(0); |
| Integer result = (Integer) testHelper.getOperator().getNext(dummy).result; |
| int actual = result.intValue(); |
| assertEquals(expected, actual); |
| } |
| } |
| |
| public void testPOBinCondWithInteger() throws ExecException, PlanException { |
| |
| bag= getBag(DataType.INTEGER); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.INTEGER, new Integer(1) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| Integer value = (Integer) t.get(0); |
| int expected = (value.intValue() == 1)? 1:0 ; |
| Integer result=(Integer)testHelper.getOperator().getNext(value).result; |
| int actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } |
| |
| } |
| |
| public void testPOBinCondWithLong() throws ExecException, PlanException { |
| bag= getBag(DataType.LONG); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.LONG, new Long(1L) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| Long value = (Long) t.get(0); |
| int expected = (value.longValue() == 1L )? 1:0 ; |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| int actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } |
| } |
| |
| public void testPOBinCondWithFloat() throws ExecException, PlanException { |
| |
| bag= getBag(DataType.FLOAT); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.FLOAT, new Float(1.0f) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| Float value = (Float) t.get(0); |
| int expected = (value.floatValue() == 1.0f )? 1:0 ; |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| int actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } |
| |
| } |
| |
| public void testPOBinCondWithDouble() throws ExecException, PlanException { |
| |
| bag= getBag(DataType.DOUBLE); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.DOUBLE, new Double(1.0) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| Double value = (Double) t.get(0); |
| int expected = (value.doubleValue() == 1.0 )? 1:0 ; |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| int actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } |
| |
| } |
| |
| public void testPOBinCondWithDateTime() throws ExecException, PlanException { |
| bag= getBag(DataType.DATETIME); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.DATETIME, new DateTime(1L) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| DateTime value = (DateTime) t.get(0); |
| int expected = value.equals(new DateTime(1L))? 1:0 ; |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| int actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } |
| } |
| |
| public void testPOBinCondIntWithNull() throws ExecException, PlanException { |
| |
| bag= getBagWithNulls(DataType.INTEGER); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.INTEGER, new Integer(1) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| Integer value = null; |
| Integer result; |
| |
| if (t.get(0) != null) { |
| value = (Integer) t.get(0); |
| result = (Integer) testHelper.getOperator().getNext(value).result; |
| } else { |
| result = (Integer) testHelper.getOperator().getNext( |
| (Integer) null).result; |
| } |
| int actual; |
| if (value != null) { |
| int expected = (value.intValue() == 1) ? 1 : 0; |
| actual = result.intValue(); |
| assertEquals(expected, actual); |
| } else { |
| assertEquals(null, result); |
| } |
| |
| } |
| |
| } |
| |
| public void testPOBinCondLongWithNull() throws ExecException, PlanException { |
| |
| bag= getBagWithNulls(DataType.LONG); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.LONG, new Long(1L) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| |
| Long value=null; |
| if ( t.get(0)!=null){ |
| value = (Long) t.get(0); |
| } |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| int expected; |
| int actual; |
| if ( value!=null ) { |
| expected=(value.intValue() == 1)? 1:0 ; |
| actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } else { |
| assertEquals( null, result ); |
| } |
| } |
| } |
| |
| public void testPOBinCondDoubleWithNull() throws ExecException, PlanException { |
| |
| bag= getBagWithNulls(DataType.DOUBLE); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.DOUBLE, new Double(1.0) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| |
| Double value=null; |
| if ( t.get(0)!=null){ |
| value = (Double) t.get(0); |
| } |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| |
| int expected; |
| int actual; |
| if ( value!=null ) { |
| expected=(value.intValue() == 1)? 1:0 ; |
| actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } else { |
| assertEquals( null, result ); |
| } |
| |
| |
| } |
| |
| } |
| |
| public void testPOBinCondDateTimeWithNull() throws ExecException, PlanException { |
| |
| bag= getBagWithNulls(DataType.DATETIME); |
| TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.DATETIME, new DateTime(1L) ); |
| |
| for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { |
| Tuple t = it.next(); |
| testHelper.getPlan().attachInput(t); |
| |
| DateTime value=null; |
| if ( t.get(0)!=null){ |
| value = (DateTime) t.get(0); |
| } |
| Integer dummy = new Integer(0); |
| Integer result=(Integer)testHelper.getOperator().getNext(dummy).result; |
| int expected; |
| int actual; |
| if ( value!=null ) { |
| expected=value.equals(new DateTime(1L))? 1:0 ; |
| actual = result.intValue(); |
| assertEquals( expected, actual ); |
| } else { |
| assertEquals( null, result ); |
| } |
| } |
| } |
| |
| protected class TestPoBinCondHelper { |
| |
| PhysicalPlan plan= null; |
| POBinCond op= null; |
| |
| |
| public <U> TestPoBinCondHelper( byte type, U value ) throws ExecException, PlanException { |
| |
| |
| ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst(); |
| rt.setValue(value); |
| rt.setResultType(type); |
| |
| POProject prj1 = GenPhyOp.exprProject(); |
| prj1.setColumn(0); |
| prj1.setResultType(type); |
| |
| |
| EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr(); |
| equal.setLhs(prj1); |
| equal.setRhs(rt); |
| equal.setOperandType(type); |
| |
| POProject prjLhs = GenPhyOp.exprProject(); |
| prjLhs.setResultType(DataType.INTEGER); |
| prjLhs.setColumn(1); |
| |
| POProject prjRhs =prjRhs = GenPhyOp.exprProject(); |
| prjRhs.setResultType(DataType.INTEGER); |
| prjRhs.setColumn(2); |
| |
| op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs); |
| op.setResultType(DataType.INTEGER); |
| |
| plan= new PhysicalPlan(); |
| plan.add(op); |
| plan.add(prjLhs); |
| plan.add(prjRhs); |
| plan.add(equal); |
| plan.connect(equal, op); |
| plan.connect(prjLhs, op); |
| plan.connect(prjRhs, op); |
| |
| plan.add(prj1); |
| plan.add(rt); |
| plan.connect(prj1, equal); |
| plan.connect(rt, equal); |
| |
| // File tmpFile = File.createTempFile("test", ".txt" ); |
| //PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); |
| //plan.explain(ps); |
| //ps.close(); |
| |
| } |
| |
| public PhysicalPlan getPlan(){ |
| return plan; |
| } |
| |
| |
| public POBinCond getOperator(){ |
| return op; |
| } |
| |
| |
| } |
| |
| private DataBag getBag(byte type) { |
| DataBag bag = DefaultBagFactory.getInstance().newDefaultBag(); |
| for(int i = 0; i < 10; i ++) { |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| switch(type) { |
| case DataType.BOOLEAN: |
| t.append(r.nextBoolean()); |
| break; |
| case DataType.INTEGER: |
| t.append(r.nextInt(2)); |
| break; |
| case DataType.LONG: |
| t.append(r.nextLong() % 2L); |
| break; |
| case DataType.FLOAT: |
| t.append((i % 2 == 0 ? 1.0f : 0.0f)); |
| break; |
| case DataType.DOUBLE: |
| t.append((i % 2 == 0 ? 1.0 : 0.0)); |
| break; |
| case DataType.DATETIME: |
| t.append(new DateTime(r.nextLong() % 2L)); |
| break; |
| } |
| t.append(1); |
| t.append(0); |
| bag.add(t); |
| } |
| return bag; |
| } |
| |
| private DataBag getBagWithNulls(byte type) { |
| DataBag bag = DefaultBagFactory.getInstance().newDefaultBag(); |
| for(int i = 0; i < 10; i ++) { |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| if (r.nextInt(4)%3 == 0){ |
| t.append(null); |
| }else{ |
| switch(type) { |
| case DataType.BOOLEAN: |
| t.append(r.nextBoolean()); |
| break; |
| case DataType.INTEGER: |
| t.append(r.nextInt(2)); |
| break; |
| case DataType.LONG: |
| t.append(r.nextLong() % 2L); |
| break; |
| case DataType.FLOAT: |
| t.append( (i % 2 == 0 ? 1.0f : 0.0f)); |
| break; |
| case DataType.DOUBLE: |
| t.append( (i % 2 == 0 ? 1.0 : 0.0)); |
| break; |
| case DataType.DATETIME: |
| t.append(new DateTime(r.nextLong() % 2L)); |
| break; |
| } |
| } |
| t.append(1); |
| t.append(0); |
| bag.add(t); |
| } |
| return bag; |
| } |
| } |