blob: b0ad0225bba19556e5353d653e40fdf662005201 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.junit.Before;
import org.junit.Test;
public class TestPOGenerate {
DataBag cogroup;
DataBag partialFlatten;
DataBag simpleGenerate;
DataBag mapFlatten;
Random r = new Random();
BagFactory bf = BagFactory.getInstance();
TupleFactory tf = TupleFactory.getInstance();
@Before
public void setUp() throws Exception {
Tuple [] inputA = new Tuple[4];
Tuple [] inputB = new Tuple[4];
Tuple [] inputC = new Tuple[4];
for(int i = 0; i < 4; i++) {
inputA[i] = tf.newTuple(2);
inputB[i] = tf.newTuple(1);
inputC[i] = tf.newTuple(2);
}
Map map0 = new HashMap<String,String>();
Map map1 = new HashMap<String,String>();
Map map2 = new HashMap<String,String>();
Map map3 = new HashMap<String,String>();
map0.put("A","");
map0.put("B","");
map1.put("A","a");
map1.put("B","b");
map2.put("A","aa");
map2.put("B","bb");
map3.put("A","aaa");
map3.put("B","bbb");
inputA[0].set(0, 'a');
inputA[0].set(1, '1');
inputA[1].set(0, 'b');
inputA[1].set(1, '1');
inputA[2].set(0, 'a');
inputA[2].set(1, '1');
inputA[3].set(0, 'c');
inputA[3].set(1, '1');
inputB[0].set(0, 'b');
inputB[1].set(0, 'b');
inputB[2].set(0, 'a');
inputB[3].set(0, 'd');
inputC[0].set(0, 0);
inputC[0].set(1, map0);
inputC[1].set(0, 1);
inputC[1].set(1, map1);
inputC[2].set(0, 2);
inputC[2].set(1, map2);
inputC[3].set(0, 3);
inputC[3].set(1, map3);
DataBag cg11 = bf.newDefaultBag();
cg11.add(inputA[0]);
cg11.add(inputA[2]);
DataBag cg21 = bf.newDefaultBag();
cg21.add(inputA[1]);
DataBag cg31 = bf.newDefaultBag();
cg31.add(inputA[3]);
DataBag emptyBag = bf.newDefaultBag();
DataBag cg12 = bf.newDefaultBag();
cg12.add(inputB[2]);
DataBag cg22 = bf.newDefaultBag();
cg22.add(inputB[0]);
cg22.add(inputB[1]);
DataBag cg42 = bf.newDefaultBag();
cg42.add(inputB[3]);
Tuple [] tIn = new Tuple[4];
for(int i = 0; i < 4; ++i) {
tIn[i] = tf.newTuple(2);
}
tIn[0].set(0, cg11);
tIn[0].set(1, cg12);
tIn[1].set(0, cg21);
tIn[1].set(1, cg22);
tIn[2].set(0, cg31);
tIn[2].set(1, emptyBag);
tIn[3].set(0, emptyBag);
tIn[3].set(1, cg42);
cogroup = bf.newDefaultBag();
for(int i = 0; i < 4; ++i) {
cogroup.add(tIn[i]);
}
Tuple[] tPartial = new Tuple[4];
for(int i = 0; i < 4; ++i) {
tPartial[i] = tf.newTuple(2);
tPartial[i].set(0, inputA[i].get(0));
tPartial[i].set(1, inputA[i].get(1));
}
tPartial[0].append(cg12);
tPartial[1].append(cg22);
tPartial[2].append(cg12);
tPartial[3].append(emptyBag);
partialFlatten = bf.newDefaultBag();
for (int i = 0; i < 4; ++i) {
partialFlatten.add(tPartial[i]);
}
simpleGenerate = bf.newDefaultBag();
for (int i = 0; i < 4; ++i) {
simpleGenerate.add(inputA[i]);
}
mapFlatten = bf.newDefaultBag();
for (int i = 0; i < inputC.length; ++i) {
mapFlatten.add(inputC[i]);
}
//System.out.println("Cogroup : " + cogroup);
//System.out.println("Partial : " + partialFlatten);
//System.out.println("Simple : " + simpleGenerate);
}
@Test
public void testJoin() throws Exception {
ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
prj1.setResultType(DataType.BAG);
prj2.setResultType(DataType.BAG);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(true);
toBeFlattened.add(true);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
PhysicalPlan plan2 = new PhysicalPlan();
plan2.add(prj2);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
inputs.add(plan2);
PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
//DataBag obtained = bf.newDefaultBag();
for (Tuple t : cogroup) {
/*plan1.attachInput(t);
plan2.attachInput(t);*/
poGen.attachInput(t);
Result output = poGen.getNextTuple();
while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
//System.out.println(output.result);
Tuple tObtained = (Tuple) output.result;
assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
//obtained.add((Tuple) output.result);
output = poGen.getNextTuple();
}
}
}
@Test
public void testPartialJoin() throws Exception {
ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
prj1.setResultType(DataType.BAG);
prj2.setResultType(DataType.BAG);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(true);
toBeFlattened.add(false);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
PhysicalPlan plan2 = new PhysicalPlan();
plan2.add(prj2);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
inputs.add(plan2);
PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
//DataBag obtained = bf.newDefaultBag();
List<String> obtained = new LinkedList<String>();
for (Tuple t : cogroup) {
/*plan1.attachInput(t);
plan2.attachInput(t);*/
poGen.attachInput(t);
Result output = poGen.getNextTuple();
while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
//System.out.println(output.result);
obtained.add(((Tuple) output.result).toString());
output = poGen.getNextTuple();
}
}
int count = 0;
for (Tuple t : partialFlatten) {
assertTrue(obtained.contains(t.toString()));
++count;
}
assertEquals(partialFlatten.size(), count);
}
@Test
public void testSimpleGenerate() throws Exception {
ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
prj1.setResultType(DataType.INTEGER);
prj2.setResultType(DataType.INTEGER);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(true);
toBeFlattened.add(false);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
PhysicalPlan plan2 = new PhysicalPlan();
plan2.add(prj2);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
inputs.add(plan2);
PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
//DataBag obtained = bf.newDefaultBag();
List<String> obtained = new LinkedList<String>();
for (Tuple t : simpleGenerate) {
/*plan1.attachInput(t);
plan2.attachInput(t);*/
poGen.attachInput(t);
Result output = poGen.getNextTuple();
while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
//System.out.println(output.result);
obtained.add(((Tuple) output.result).toString());
output = poGen.getNextTuple();
}
}
int count = 0;
for (Tuple t : simpleGenerate) {
assertTrue(obtained.contains(t.toString()));
++count;
}
assertEquals(simpleGenerate.size(), count);
}
@Test
public void testMapFlattenGenerate() throws Exception {
ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
prj1.setResultType(DataType.INTEGER);
prj2.setResultType(DataType.MAP);
List<Boolean> toBeFlattened = new LinkedList<Boolean>();
toBeFlattened.add(false);
toBeFlattened.add(true);
PhysicalPlan plan1 = new PhysicalPlan();
plan1.add(prj1);
PhysicalPlan plan2 = new PhysicalPlan();
plan2.add(prj2);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
inputs.add(plan1);
inputs.add(plan2);
PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
List<String> obtained = new LinkedList<String>();
for (Tuple t : mapFlatten) {
poGen.attachInput(t);
Result output = poGen.getNextTuple();
while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
//System.out.println(output.result);
obtained.add(((Tuple) output.result).toString());
output = poGen.getNextTuple();
}
}
int count = 0;
for (Tuple t : mapFlatten) {
Tuple expected = tf.newTuple(3);
expected.set(0, t.get(0));
for (Object entryObj : ((Map)t.get(1)).entrySet()){
Map.Entry entry = ((Map.Entry)entryObj);
expected.set(1, entry.getKey());
expected.set(2, entry.getValue());
assertTrue(obtained.contains(expected.toString()));
++count;
}
}
assertEquals(mapFlatten.size()*2, count);
}
}