| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.pig.PigServer; |
| import org.apache.pig.builtin.mock.Storage; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestScalarAliasesLocal { |
| private static final String BUILD_TEST_TMP = "build/test/tmp/"; |
| private PigServer pigServer; |
| |
| TupleFactory mTf = TupleFactory.getInstance(); |
| BagFactory mBf = BagFactory.getInstance(); |
| |
| @Before |
| public void setUp() throws Exception{ |
| pigServer = new PigServer(Util.getLocalTestMode()); |
| } |
| |
| public static void deleteDirectory(File file) { |
| if (file.exists()) { |
| Util.deleteDirectory(file); |
| } |
| } |
| |
| public static File createLocalInputFile(String filename, String[] inputData) |
| throws IOException { |
| new File(filename).getParentFile().mkdirs(); |
| return Util.createLocalInputFile(filename, inputData); |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarAliasesBatchNobatch() throws Exception{ |
| String[] input = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| |
| String output = BUILD_TEST_TMP+"table_testScalarAliasesDir"; |
| TestScalarAliases.deleteDirectory(new File(output)); |
| // Test the use of scalars in expressions |
| String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesBatch"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| // Test in script mode |
| pigServer.setBatchOn(); |
| pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); |
| pigServer.registerQuery("B = group A all;"); |
| pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); |
| pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);"); |
| pigServer.registerQuery("Store Y into '" + output + "';"); |
| pigServer.executeBatch(); |
| // Check output |
| pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);"); |
| |
| Iterator<Tuple> iter; |
| Tuple t; |
| iter = pigServer.openIterator("Z"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(3,0.25)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(6,0.5)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(9,1.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| iter = pigServer.openIterator("Y"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(3,0.25)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(6,0.5)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(9,1.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testUseScalarMultipleTimes() throws Exception{ |
| String[] input = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| |
| String outputY = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutY"; |
| TestScalarAliases.deleteDirectory(new File(outputY)); |
| String outputZ = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutZ"; |
| TestScalarAliases.deleteDirectory(new File(outputZ)); |
| // Test the use of scalars in expressions |
| String inputPath = BUILD_TEST_TMP+"table_testUseScalarMultipleTimes"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| pigServer.setBatchOn(); |
| pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); |
| pigServer.registerQuery("B = group A all;"); |
| pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); |
| pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);"); |
| pigServer.registerQuery("Store Y into '" + outputY + "';"); |
| pigServer.registerQuery("Z = foreach A generate (a1 + C.count), (a0 * C.max);"); |
| pigServer.registerQuery("Store Z into '" + outputZ + "';"); |
| // Test Multiquery store |
| pigServer.executeBatch(); |
| |
| // Check output |
| pigServer.registerQuery("M = LOAD '" + outputY + "' as (a0: int, a1: double);"); |
| |
| Iterator<Tuple> iter; |
| Tuple t; |
| iter = pigServer.openIterator("M"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(3,0.25)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(6,0.5)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(9,1.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| // Check output |
| pigServer.registerQuery("N = LOAD '" + outputZ + "' as (a0: double, a1: double);"); |
| |
| iter = pigServer.openIterator("N"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(8.0,20.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(13.0,40.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(23.0,60.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| // Non batch mode |
| iter = pigServer.openIterator("Y"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(3,0.25)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(6,0.5)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(9,1.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| // Check in non-batch mode |
| iter = pigServer.openIterator("Z"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(8.0,20.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(13.0,40.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(23.0,60.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarWithNoSchema() throws Exception{ |
| String[] scalarInput = { |
| "1\t5" |
| }; |
| String[] input = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchema"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaScalar"; |
| TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput); |
| // Load A as a scalar |
| pigServer.registerQuery("A = LOAD '" + inputPath + "';"); |
| pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "' as (count, total);"); |
| pigServer.registerQuery("B = foreach A generate 5 / scalar.total;"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("B"); |
| |
| Tuple t = iter.next(); |
| assertTrue(t.get(0).toString().equals("1")); |
| |
| t = iter.next(); |
| assertTrue(t.get(0).toString().equals("1")); |
| |
| t = iter.next(); |
| assertTrue(t.get(0).toString().equals("1")); |
| |
| assertFalse(iter.hasNext()); |
| |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarWithTwoBranches() throws Exception{ |
| String[] inputA = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| |
| String[] inputX = { |
| "pig", |
| "hadoop", |
| "rocks" |
| }; |
| |
| String output = BUILD_TEST_TMP+"testScalarWithTwoBranchesDir"; |
| TestScalarAliases.deleteDirectory(new File(output)); |
| // Test the use of scalars in expressions |
| String inputPathA = BUILD_TEST_TMP+"testScalarWithTwoBranchesA"; |
| TestScalarAliases.createLocalInputFile(inputPathA, inputA); |
| String inputPathX = BUILD_TEST_TMP+"testScalarWithTwoBranchesX"; |
| TestScalarAliases.createLocalInputFile(inputPathX, inputX); |
| // Test in script mode |
| pigServer.setBatchOn(); |
| pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0: long, a1: double);"); |
| pigServer.registerQuery("B = group A all;"); |
| pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); |
| pigServer.registerQuery("X = LOAD '" + inputPathX + "' as (names: chararray);"); |
| pigServer.registerQuery("Y = foreach X generate names, C.max;"); |
| pigServer.registerQuery("Store Y into '" + output + "';"); |
| pigServer.executeBatch(); |
| // Check output |
| pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: chararray, a1: double);"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("Z"); |
| |
| Tuple t = iter.next(); |
| assertTrue(t.toString().equals("(pig,20.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(hadoop,20.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(rocks,20.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| // Check in non-batch mode |
| iter = pigServer.openIterator("Y"); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(pig,20.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(hadoop,20.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(rocks,20.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| pigServer.getPigContext().getProperties().remove("tez.am.inline.task.execution.max-tasks"); |
| |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testFilteredScalarDollarProj() throws Exception{ |
| String output = BUILD_TEST_TMP+"table_testFilteredScalarDollarProjDir"; |
| TestScalarAliases.deleteDirectory(new File(output)); |
| String[] input = { |
| "1\t5\t[state#maine,city#portland]\t{(a),(b)}\t(a,b)", |
| "2\t10\t\t\t", |
| "3\t20\t\t\t" |
| }; |
| |
| // Test the use of scalars in expressions |
| String inputPath = BUILD_TEST_TMP+"table_testFilteredScalarDollarProj"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| // Test in script mode |
| pigServer.setBatchOn(); |
| pigServer.registerQuery("A = LOAD '" + inputPath + "'" + " as (a0: long, a1: double, a2 : bytearray, " + "a3: bag{ t : tuple(tc : chararray)}, " + "a4: tuple(c1 : chararray, c2 : chararray) );"); |
| pigServer.registerQuery("B = filter A by $1 < 8;"); |
| pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1), B.$2, B.$2#'state', B.$3, B.a4;"); |
| pigServer.registerQuery("Store Y into '" + output + "';"); |
| pigServer.explain("Y", System.err); |
| pigServer.executeBatch(); |
| // Check output |
| pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);"); |
| pigServer.explain("Z", System.err); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("Z"); |
| |
| Tuple t = iter.next(); |
| assertTrue(t.toString().equals("(1,1.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(2,2.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(3,4.0)")); |
| |
| assertFalse(iter.hasNext()); |
| |
| // Check in non-batch mode |
| iter = pigServer.openIterator("Y"); |
| |
| t = iter.next(); |
| assertEquals(t.toString(),"(1,1.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); |
| |
| t = iter.next(); |
| assertEquals(t.toString(),"(2,2.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); |
| |
| t = iter.next(); |
| assertEquals(t.toString(),"(3,4.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); |
| |
| assertFalse(iter.hasNext()); |
| |
| |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarWithNoSchemaDollarProj() throws Exception{ |
| String[] scalarInput = { |
| "1\t5" |
| }; |
| String[] input = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProj"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProjScalar"; |
| TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput); |
| // Load A as a scalar |
| pigServer.registerQuery("A = LOAD '" + inputPath + "';"); |
| pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "';"); |
| pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("B"); |
| |
| Tuple t = iter.next(); |
| assertTrue(t.get(0).toString().equals("1")); |
| |
| t = iter.next(); |
| assertTrue(t.get(0).toString().equals("1")); |
| |
| t = iter.next(); |
| assertTrue(t.get(0).toString().equals("1")); |
| |
| assertFalse(iter.hasNext()); |
| |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarAliasesJoinClause() throws Exception{ |
| String[] inputA = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| String[] inputB = { |
| "Total3\tthree", |
| "Total2\ttwo", |
| "Total1\tone" |
| }; |
| |
| // Test the use of scalars in expressions |
| String inputPathA = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseA"; |
| TestScalarAliases.createLocalInputFile(inputPathA, inputA); |
| String inputPathB = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseB"; |
| TestScalarAliases.createLocalInputFile(inputPathB, inputB); |
| // Test in script mode |
| pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0, a1);"); |
| pigServer.registerQuery("G = group A all;"); |
| pigServer.registerQuery("C = foreach G generate COUNT(A) as count;"); |
| |
| pigServer.registerQuery("B = LOAD '" + inputPathB + "' as (b0:chararray, b1:chararray);"); |
| pigServer.registerQuery("Y = join A by CONCAT('Total', (chararray)C.count), B by $0;"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("Y"); |
| |
| String[] expected = new String[] { |
| "(1,5,Total3,three)", |
| "(2,10,Total3,three)", |
| "(3,20,Total3,three)" |
| }; |
| |
| Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("Y"))); |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarAliasesFilterClause() throws Exception{ |
| String[] input = { |
| "1\t5", |
| "2\t10", |
| "3\t20", |
| "4\t12", |
| "5\t8" |
| }; |
| |
| // Test the use of scalars in expressions |
| String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesFilterClause"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| // Test in script mode |
| pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0, a1);"); |
| pigServer.registerQuery("G = group A all;"); |
| pigServer.registerQuery("C = foreach G generate AVG(A.$1) as average;"); |
| |
| pigServer.registerQuery("Y = filter A by a1 > C.average;"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("Y"); |
| |
| // Average is 11 |
| Tuple t = iter.next(); |
| assertTrue(t.toString().equals("(3,20)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(4,12)")); |
| |
| assertFalse(iter.hasNext()); |
| } |
| |
| // See PIG-1434 |
| @Test |
| public void testScalarAliasesGrammarNegative() throws Exception{ |
| String[] input = { |
| "1\t5", |
| "2\t10", |
| "3\t20" |
| }; |
| |
| String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesGrammar"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| |
| try { |
| pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); |
| pigServer.registerQuery("B = group A all;"); |
| pigServer.registerQuery("C = foreach B generate COUNT(A);"); |
| // Only projections of C are supported |
| pigServer.registerQuery("Y = foreach A generate C;"); |
| pigServer.openIterator( "Y" ); |
| //Control should not reach here |
| fail("Scalar projections are only supported"); |
| } catch (IOException pe){ |
| assertTrue(pe.getMessage().contains("Invalid scalar projection: C")); |
| } |
| } |
| |
| // See PIG-1636 |
| @Test |
| public void testScalarAliasesLimit() throws Exception{ |
| String[] input = { |
| "a\t1", |
| "b\t2", |
| "c\t3", |
| "a\t4", |
| "c\t5" |
| }; |
| |
| // Test the use of scalars in expressions |
| String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesLimit"; |
| TestScalarAliases.createLocalInputFile(inputPath, input); |
| // Test in script mode |
| pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0:chararray, a1: int);"); |
| pigServer.registerQuery("G = group A all;"); |
| pigServer.registerQuery("C = foreach G generate SUM(A.$1) as total;"); |
| pigServer.registerQuery("C1 = limit C 1;"); |
| pigServer.registerQuery("Y = foreach A generate a0, a1 * (double)C1.total;"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("Y"); |
| |
| // Average is 11 |
| Tuple t = iter.next(); |
| assertTrue(t.toString().equals("(a,15.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(b,30.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(c,45.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(a,60.0)")); |
| |
| t = iter.next(); |
| assertTrue(t.toString().equals("(c,75.0)")); |
| |
| assertFalse(iter.hasNext()); |
| } |
| |
| /** |
| * Test that a specific string is included in the error message when an |
| * exception is thrown for using a relation in a |
| * scalar context without projecting any columns out of it |
| */ |
| // See PIG-1788 |
| @Test |
| public void testScalarWithNoProjection() throws Exception{ |
| String query = |
| " A = load 'table_testScalarWithNoProjection' as (x, y);" + |
| " B = group A by x;" + |
| // B is unintentionally being used as scalar, |
| // the user intends it to be COUNT(A) |
| " C = foreach B generate COUNT(B);"; |
| |
| Util.checkExceptionMessage(query, "C", |
| "A column needs to be projected from a relation" + |
| " for it to be used as a scalar" |
| ); |
| } |
| |
| @Test |
| public void testScalarNullValue() throws Exception{ |
| Storage.Data data = Storage.resetData(pigServer); |
| data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2)); |
| |
| pigServer.setBatchOn(); |
| pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);"); |
| pigServer.registerQuery("B = FILTER A by a == 'c';"); |
| pigServer.registerQuery("C = FOREACH A generate a, b + B.b;"); |
| pigServer.registerQuery("store C into 'output' using mock.Storage();"); |
| |
| pigServer.executeBatch(); |
| |
| List<Tuple> actualResults = data.get("output"); |
| List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( |
| new String[] {"('a', null)", "('b', null)"}); |
| Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); |
| |
| } |
| |
| @Test |
| public void testScalarNullValue2() throws Exception{ |
| Storage.Data data = Storage.resetData(pigServer); |
| data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2)); |
| |
| pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);"); |
| pigServer.registerQuery("B = FILTER A by a == 'c';"); |
| pigServer.registerQuery("C = GROUP B ALL;"); |
| pigServer.registerQuery("D = FOREACH C GENERATE COUNT(B.b) as count;"); |
| pigServer.registerQuery("E = FOREACH A GENERATE (D.count IS NOT NULL? D.count : 0l);;"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("E"); |
| Tuple t = iter.next(); |
| assertTrue(t.toString().equals("(0)")); |
| } |
| } |