| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.test; |
| import static org.apache.pig.builtin.mock.Storage.resetData; |
| import static org.apache.pig.builtin.mock.Storage.tuple; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.pig.PigServer; |
| import org.apache.pig.builtin.mock.Storage.Data; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.newplan.Operator; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Lists; |
| |
| public class TestCubeOperator { |
| private static PigServer pigServer; |
| private static TupleFactory tf = TupleFactory.getInstance(); |
| private Data data; |
| |
| @BeforeClass |
| public static void oneTimeSetUp() throws Exception { |
| pigServer = new PigServer("local"); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| |
| data = resetData(pigServer); |
| data.set("input", tuple("dog", "miami", 12), tuple("cat", "miami", 18), |
| tuple("turtle", "tampa", 4), tuple("dog", "tampa", 14), tuple("cat", "naples", 9), |
| tuple("dog", "naples", 5), tuple("turtle", "naples", 1)); |
| |
| data.set("input1", tuple("u1,men,green,mango"), tuple("u2,men,red,mango"), |
| tuple("u3,men,green,apple"), tuple("u4,women,red,mango"), |
| tuple("u6,women,green,mango"), tuple("u7,men,red,apple"), |
| tuple("u8,men,green,mango"), tuple("u9,women,red,apple"), |
| tuple("u10,women,green,apple"), tuple("u11,men,red,apple"), |
| tuple("u12,women,green,mango")); |
| |
| data.set("input2", tuple("dog", "miami", "white", "pet", 5)); |
| |
| data.set("input3", tuple("dog", "miami", 12), tuple(null, "miami", 18)); |
| |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws IOException { |
| } |
| |
| @Test |
| public void testCubeBasic() throws IOException { |
| // basic correctness test |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = cube a by cube(x,y);" |
| + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;\n" |
| + "store c into 'output' using mock.Storage();"; |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testRollupBasic() throws IOException { |
| // basic correctness test |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = cube a by rollup(x,y);" |
| + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" |
| + "store c into 'output' using mock.Storage();"; |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| } |
| |
| @Test |
| public void testCubeAndRollup() throws IOException { |
| // basic correctness test |
| String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);" |
| + "b = cube a by cube(v,w), rollup(x,y);" |
| + "c = foreach b generate flatten(group) as (type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as total;" |
| + "store c into 'output' using mock.Storage();"; |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet |
| .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", "pet", (long) 1, |
| (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, "white", "pet", |
| (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, "miami", |
| "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, |
| null, "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( |
| "dog", "miami", "white", null, (long) 1, (long) 5)), tf.newTuple(Lists |
| .newArrayList("dog", null, "white", null, (long) 1, (long) 5)), tf |
| .newTuple(Lists.newArrayList(null, "miami", "white", null, (long) 1, |
| (long) 5)), tf.newTuple(Lists.newArrayList(null, null, "white", |
| null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", "miami", |
| null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", |
| null, null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( |
| null, "miami", null, null, (long) 1, (long) 5)), tf.newTuple(Lists |
| .newArrayList(null, null, null, null, (long) 1, (long) 5))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeMultipleIAliases() throws IOException { |
| // test for input alias to cube being assigned multiple times |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "a = load 'input' USING mock.Storage() as (x,y:chararray,z:long);" |
| + "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = cube a by cube(x,y);" |
| + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" |
| + "store c into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeAfterForeach() throws IOException { |
| // test for foreach projection before cube operator |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = foreach a generate x as type,y as location,z as number;" |
| + "c = cube b by cube(type,location);" |
| + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.number) as total;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeAfterLimit() throws IOException { |
| // test for limit operator before cube operator |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = limit a 2;" + "c = cube b by cube(x,y);" |
| + "d = foreach c generate flatten(group) as (x,y), SUM(cube.z) as total;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 18)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 12)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 30))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeWithStar() throws IOException { |
| // test for * (all) dimensions in cube operator |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray);" |
| + "b = foreach a generate x as type,y as location;" |
| + "c = cube b by cube(*);" |
| + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 3)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeWithRange() throws IOException { |
| // test for range projection of dimensions in cube operator |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = foreach a generate x as type,y as location, z as number;" |
| + "c = cube b by cube($0..$1);" |
| + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.number) as total;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeDuplicateDimensions() throws IOException { |
| // test for cube operator with duplicate dimensions |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = foreach a generate x as type,y as location, z as number;" |
| + "c = cube b by cube($0..$1,$0..$1);" |
| + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.number) as total;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| try { |
| Util.registerMultiLineQuery(pigServer, query); |
| pigServer.openIterator("d"); |
| } catch (FrontendException e) { |
| // FEException with 'duplicate dimensions detected' message is throw |
| return; |
| } |
| |
| Assert.fail("Expected to throw an exception when duplicate dimensions are detected!"); |
| |
| } |
| |
| @Test |
| public void testCubeAfterFilter() throws IOException { |
| // test for filtering before cube operator |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = filter a by x == 'dog';" |
| + "c = cube b by cube(x,y);" |
| + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.z) as total;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| // Iterator<Tuple> it = pigServer.openIterator("d"); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 3, (long) 31))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeAfterOrder() throws IOException { |
| // test for ordering before cube operator |
| String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = order a by $2;" |
| + "c = cube b by cube(x,y);" |
| + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.z) as total;" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| } |
| |
| @Test |
| public void testCubeAfterJoin() throws IOException { |
| // test for cubing on joined relations |
| String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " |
| + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" |
| + "c = join a by a1, b by d2;" |
| + "d = cube c by cube($4,$5);" |
| + "e = foreach d generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.c2) as total;" |
| + "store e into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| } |
| |
| @Test |
| public void testCubeAfterCogroup() throws IOException { |
| // test for cubing on co-grouped relation |
| String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " |
| + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" |
| + "c = cogroup a by a1, b by d2;" |
| + "d = foreach c generate flatten(a), flatten(b);" |
| + "e = cube d by cube(a2,b2);" |
| + "f = foreach e generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.c2) as total;" |
| + "store f into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), |
| tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| } |
| |
| @Test |
| public void testCubeWithNULLs() throws IOException { |
| // test for dimension values with legitimate null values |
| String query = "a = load 'input3' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = cube a by cube(x,y);" |
| + "c = foreach b generate flatten(group) as (type,location), SUM(cube.z) as total;" |
| + "store c into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 12)), |
| tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 30)), |
| tf.newTuple(Lists.newArrayList("unknown", "miami", (long) 18)), |
| tf.newTuple(Lists.newArrayList("unknown", null, (long) 18))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testCubeWithNULLAndFilter() throws IOException { |
| // test for dimension values with legitimate null values |
| // followed by filter |
| String query = "a = load 'input3' USING mock.Storage() as (x:chararray,y:chararray,z:long);" |
| + "b = cube a by cube(x,y);" |
| + "c = foreach b generate flatten(group) as (type,location), SUM(cube.z) as total;" |
| + "d = filter c by type!='unknown';" |
| + "store d into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 12))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| |
| } |
| |
| @Test |
| public void testRollupAfterCogroup() throws IOException { |
| // test for cubing on co-grouped relation |
| String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " |
| + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" |
| + "c = cogroup a by a1, b by d2;" |
| + "d = foreach c generate flatten(a), flatten(b);" |
| + "e = cube d by rollup(a2,b2);" |
| + "f = foreach e generate flatten(group), COUNT(cube) as count, SUM(cube.c2) as total;" |
| + "store f into 'output' using mock.Storage();"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| |
| Set<Tuple> expected = ImmutableSet.of( |
| tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), |
| tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), |
| tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), |
| tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), |
| tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), |
| tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), |
| tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), |
| tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); |
| |
| List<Tuple> out = data.get("output"); |
| for (Tuple tup : out) { |
| assertTrue(expected + " contains " + tup, expected.contains(tup)); |
| } |
| } |
| |
| @Test |
| public void testIllustrate() throws IOException { |
| // test for illustrate |
| String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " |
| + "b = cube a by cube(a1,b1);"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| Map<Operator, DataBag> examples = pigServer.getExamples("b"); |
| assertTrue(examples != null); |
| } |
| |
| @Test |
| public void testExplainCube() throws IOException { |
| // test for explain |
| String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " |
| + "b = cube a by cube(a1,b1);"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PrintStream ps = new PrintStream(baos); |
| pigServer.explain("b", ps); |
| assertTrue(baos.toString().contains("CubeDimensions")); |
| } |
| |
| @Test |
| public void testExplainRollup() throws IOException { |
| // test for explain |
| String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " |
| + "b = cube a by rollup(a1,b1);"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PrintStream ps = new PrintStream(baos); |
| pigServer.explain("b", ps); |
| assertTrue(baos.toString().contains("RollupDimensions")); |
| } |
| |
| @Test |
| public void testDescribe() throws IOException { |
| // test for describe |
| String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " |
| + "b = cube a by cube(a1,b1);"; |
| |
| Util.registerMultiLineQuery(pigServer, query); |
| Schema sch = pigServer.dumpSchema("b"); |
| for (String alias : sch.getAliases()) { |
| if (alias.compareTo("cube") == 0) { |
| assertTrue(alias.contains("cube")); |
| } |
| } |
| } |
| } |