blob: 888be28d0cdb5ae57eb5023e8810b413f22c54f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.builtin;
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.apache.pig.test.junit.OrderedJUnit4Runner;
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
// Need to run testPythonUDF_onCluster first due to TEZ-1802
@RunWith(OrderedJUnit4Runner.class)
@TestOrder({
"testPythonUDF_onCluster",
"testPythonUDF_withBytearrayAndBytes_onCluster",
"testPythonUDF__allTypes",
"testPythonUDF__withBigDecimal",
"testPythonUDF",
"testPythonUDF__withBigInteger",
"testPythonUDF__withDateTime",
"testPythonUDF_withNewline"
})
public class TestStreamingUDF {
private static PigServer pigServerLocal = null;
private static PigServer pigServerMapReduce = null;
private TupleFactory tf = TupleFactory.getInstance();
private static MiniGenericCluster cluster;
@BeforeClass
public static void oneTimeSetup() {
cluster = MiniGenericCluster.buildCluster();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Before
public void setUp() throws Exception {
Util.resetStateForExecModeSwitch();
}
@Test
public void testPythonUDF_onCluster() throws Exception {
pigServerMapReduce = new PigServer(cluster.getExecType(), cluster.getProperties());
String[] pythonScript = {
"from pig_util import outputSchema",
"@outputSchema(\'c:chararray\')",
"def py_func(one,two):",
" return one + two"
};
Util.createLocalInputFile("pyfile_mr.py", pythonScript);
String[] input = {
"field10\tfield11",
"field20\tfield21"
};
Util.createLocalInputFile("input_mr", input);
Util.copyFromLocalToCluster(cluster, "input_mr", "input_mr");
pigServerMapReduce.registerQuery("REGISTER 'pyfile_mr.py' USING streaming_python AS pf;");
pigServerMapReduce.registerQuery("A = LOAD 'input_mr' as (c1:chararray, c2:chararray);");
pigServerMapReduce.registerQuery("B = FOREACH A generate pf.py_func(c1, c2);");
Iterator<Tuple> iter = pigServerMapReduce.openIterator("B");
assertTrue(iter.hasNext());
Tuple actual0 = iter.next();
assertTrue(iter.hasNext());
Tuple actual1 = iter.next();
Tuple expected0 = tf.newTuple("field10field11");
Tuple expected1 = tf.newTuple("field20field21");
assertEquals(expected0, actual0);
assertEquals(expected1, actual1);
}
@Test
public void testPythonUDF_withBytearrayAndBytes_onCluster() throws Exception {
pigServerMapReduce = new PigServer(cluster.getExecType(), cluster.getProperties());
String[] pythonScript = {
"from pig_util import outputSchema",
"import os",
"@outputSchema('f:bytearray')",
"def foo(bar):",
" return bytearray(os.urandom(1000))"
};
Util.createLocalInputFile( "pyfilewBaB.py", pythonScript);
String[] input = {
"field1"
};
Util.createLocalInputFile("testTupleBaB", input);
Util.copyFromLocalToCluster(cluster, "testTupleBaB", "testTupleBaB");
pigServerMapReduce.registerQuery("REGISTER 'pyfilewBaB.py' USING streaming_python AS pf;");
pigServerMapReduce.registerQuery("A = LOAD 'testTupleBaB' as (b:chararray);");
pigServerMapReduce.registerQuery("B = FOREACH A generate pf.foo(b);");
Iterator<Tuple> iter = pigServerMapReduce.openIterator("B");
assertTrue(iter.hasNext());
Object result = iter.next().get(0);
//Mostly we're happy we got a result w/o throwing an exception, but we'll
//do a basic check.
assertTrue(result instanceof DataByteArray);
assertEquals(1000, ((DataByteArray)result).size());
}
@Test
public void testPythonUDF() throws Exception {
pigServerLocal = new PigServer(Util.getLocalTestMode());
String[] pythonScript = {
"from pig_util import outputSchema",
"@outputSchema(\'c:chararray\')",
"def py_func(one,two):",
" return one + two"
};
Util.createLocalInputFile( "pyfile.py", pythonScript);
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(2);
t0.set(0, "field10");
t0.set(1, "field11");
Tuple t1 = tf.newTuple(2);
t1.set(0, "field20");
t1.set(1, "field21");
data.set("testTuples", "c1:chararray,c2:chararray", t0, t1);
pigServerLocal.registerQuery("REGISTER 'pyfile.py' USING streaming_python AS pf;");
pigServerLocal.registerQuery("A = LOAD 'testTuples' USING mock.Storage();");
pigServerLocal.registerQuery("B = FOREACH A generate pf.py_func(c1, c2);");
pigServerLocal.registerQuery("STORE B INTO 'out' USING mock.Storage();");
Tuple expected0 = tf.newTuple("field10field11");
Tuple expected1 = tf.newTuple("field20field21");
List<Tuple> out = data.get("out");
assertEquals(expected0, out.get(0));
assertEquals(expected1, out.get(1));
}
@Test
public void testPythonUDF_withNewline() throws Exception {
pigServerLocal = new PigServer(Util.getLocalTestMode());
String[] pythonScript = {
"from pig_util import outputSchema",
"@outputSchema(\'c:chararray\')",
"def py_func(one,two):",
" return '%s\\n%s' % (one,two)"
};
Util.createLocalInputFile( "pyfileNL.py", pythonScript);
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(2);
t0.set(0, "field10");
t0.set(1, "field11");
Tuple t1 = tf.newTuple(2);
t1.set(0, "field20");
t1.set(1, "field21");
data.set("testTuplesNL", "c1:chararray,c2:chararray", t0, t1);
pigServerLocal.registerQuery("REGISTER 'pyfileNL.py' USING streaming_python AS pf;");
pigServerLocal.registerQuery("A = LOAD 'testTuplesNL' USING mock.Storage();");
pigServerLocal.registerQuery("B = FOREACH A generate pf.py_func(c1, c2);");
pigServerLocal.registerQuery("STORE B INTO 'outNL' USING mock.Storage();");
Tuple expected0 = tf.newTuple("field10\nfield11");
Tuple expected1 = tf.newTuple("field20\nfield21");
List<Tuple> out = data.get("outNL");
assertEquals(expected0, out.get(0));
assertEquals(expected1, out.get(1));
}
@Test
public void testPythonUDF__withBigInteger() throws Exception {
pigServerLocal = new PigServer(Util.getLocalTestMode());
String[] pythonScript = {
"from pig_util import outputSchema",
"@outputSchema(\'biout:biginteger\')",
"def py_func(bi):",
" return bi"
};
Util.createLocalInputFile( "pyfile_bi.py", pythonScript);
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(new BigInteger("123456789012345678901234567890"));
Tuple t1 = tf.newTuple(new BigInteger("9123456789012345678901234567890"));
data.set("testBiTuples", "bi:biginteger", t0, t1);
pigServerLocal.registerQuery("REGISTER 'pyfile_bi.py' USING streaming_python AS pf;");
pigServerLocal.registerQuery("A = LOAD 'testBiTuples' USING mock.Storage();");
pigServerLocal.registerQuery("B = FOREACH A generate pf.py_func(bi);");
pigServerLocal.registerQuery("STORE B INTO 'bi_out' USING mock.Storage();");
List<Tuple> out = data.get("bi_out");
assertEquals(t0, out.get(0));
assertEquals(t1, out.get(1));
}
@Test
public void testPythonUDF__withBigDecimal() throws Exception {
pigServerLocal = new PigServer(Util.getLocalTestMode());
String[] pythonScript = {
"from pig_util import outputSchema",
"@outputSchema(\'bdout:bigdecimal\')",
"def py_func(bd):",
" return bd"
};
Util.createLocalInputFile( "pyfile_bd.py", pythonScript);
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(new BigDecimal("123456789012345678901234567890.12345"));
Tuple t1 = tf.newTuple(new BigDecimal("9123456789012345678901234567890.12345"));
data.set("testBdTuples", "bd:bigdecimal", t0, t1);
pigServerLocal.registerQuery("REGISTER 'pyfile_bd.py' USING streaming_python AS pf;");
pigServerLocal.registerQuery("A = LOAD 'testBdTuples' USING mock.Storage();");
pigServerLocal.registerQuery("B = FOREACH A generate pf.py_func(bd);");
pigServerLocal.registerQuery("STORE B INTO 'bd_out' USING mock.Storage();");
//We lose precision when we go to python.
List<Tuple> out = data.get("bd_out");
Float e0 = ((BigDecimal)t0.get(0)).floatValue();
Float e1 = ((BigDecimal)t1.get(0)).floatValue();
assertEquals(e0, ((BigDecimal) out.get(0).get(0)).floatValue(), 0.1);
assertEquals(e1, ((BigDecimal) out.get(1).get(0)).floatValue(), 0.1);
}
@Test
public void testPythonUDF__withDateTime() throws Exception {
pigServerLocal = new PigServer(Util.getLocalTestMode());
String[] pythonScript = {
"from pig_util import outputSchema",
"@outputSchema(\'d:datetime\')",
"def py_func(dt):",
" return dt"
};
Util.createLocalInputFile( "pyfile_dt.py", pythonScript);
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(new DateTime());
Tuple t1 = tf.newTuple(new DateTime());
data.set("testDateTuples", "d:datetime", t0, t1);
pigServerLocal.registerQuery("REGISTER 'pyfile_dt.py' USING streaming_python AS pf;");
pigServerLocal.registerQuery("A = LOAD 'testDateTuples' USING mock.Storage();");
pigServerLocal.registerQuery("B = FOREACH A generate pf.py_func(d);");
pigServerLocal.registerQuery("STORE B INTO 'date_out' USING mock.Storage();");
List<Tuple> out = data.get("date_out");
assertEquals(t0, out.get(0));
assertEquals(t1, out.get(1));
}
@Test
public void testPythonUDF__allTypes() throws Exception {
pigServerLocal = new PigServer(Util.getLocalTestMode());
String[] pythonScript = {
"# -*- coding: utf-8 -*-",
"from pig_util import outputSchema",
"import sys",
"",
"@outputSchema('tuple_output:tuple(nully:chararray, inty:int, longy:long, floaty:float, doubly:double, chararrayy:chararray, utf_chararray_basic_string:chararray, utf_chararray_unicode:chararray, bytearrayy:bytearray)')",
"def get_tuple_output():",
" result = (None, 32, 1000000099990000L, 32.0, 3200.1234678509, 'Some String', 'Hello\\u2026Hello', u'Hello\\u2026Hello', b'Some Byte Array')",
" return result",
"",
"@outputSchema('tuple_output:tuple(nully:chararray, inty:int, longy:long, floaty:float, doubly:double, chararrayy:chararray, utf_chararray_basic_string:chararray, utf_chararray_unicode:chararray, bytearrayy:bytearray)')",
"def crazy_tuple_identity(ct):",
" print ct",
" return ct",
"",
"@outputSchema('mappy:map[]')",
"def add_map():",
" return {u'Weird\\u2026Name' : u'Weird \\u2026 Value',",
" 'SomeNum' : 32,",
" 'Simple Name' : 'Simple Value' }",
"",
"",
"@outputSchema('silly:chararray')",
"def silly(silly_word):",
" print silly_word.encode('utf-8')",
" return silly_word",
"",
"",
"@outputSchema('final_output:bag{t:tuple(user_id:chararray,age:int,tuple_output:tuple(nully:chararray,inty:int,longy:long,floaty:float,doubly:double,chararrayy:chararray,utf_chararray_basic_string:chararray,utf_chararray_unicode:chararray,bytearrayy:bytearray),mappy:map[],silly:chararray)}')",
"def bag_identity(bag):",
" return bag"
};
Util.createLocalInputFile("allfile.py", pythonScript);
Data data = resetData(pigServerLocal);
Tuple t0 = tf.newTuple(2);
t0.set(0, "user1");
t0.set(1, 10);
Tuple t1 = tf.newTuple(2);
t1.set(0, "user2");
t1.set(1, 11);
data.set("userTuples", "user_id:chararray,age:int", t0, t1);
pigServerLocal.registerQuery("REGISTER 'allfile.py' USING streaming_python AS pf;");
pigServerLocal.registerQuery("users = LOAD 'userTuples' USING mock.Storage();");
pigServerLocal.registerQuery("crazy_tuple = FOREACH users GENERATE user_id, age, pf.get_tuple_output();");
pigServerLocal.registerQuery("crazy_tuple_fun = FOREACH crazy_tuple GENERATE user_id, age, pf.crazy_tuple_identity(tuple_output);");
pigServerLocal.registerQuery("crazy_tuple_with_map = FOREACH crazy_tuple_fun GENERATE user_id, age, tuple_output, pf.add_map(), pf.silly('\u2026');");
pigServerLocal.registerQuery("crazy_group = GROUP crazy_tuple_with_map BY (age);");
pigServerLocal.registerQuery("out = FOREACH crazy_group GENERATE group, pf.bag_identity(crazy_tuple_with_map);");
pigServerLocal.registerQuery("STORE out INTO 'all_out' USING mock.Storage();");
List<Tuple> out = data.get("all_out");
/*
* Expected output for first tuple.
* (10,
* {(user1,10,(,32,1000000099990000,32.0,3200.12346785,Some String,Hello\u2026Hello,Hello\u2026Hello,Some Byte Array),
* [Simple Name#Simple Value,SomeNum#32,Weird\u2026Name#Weird \u2026 Value],\u2026)
* })
*/
//Get Bag
DataBag bag = (DataBag) out.get(0).get(1);
assertEquals(1, bag.size());
//Get First Bag Tuple
Tuple innerTuple = bag.iterator().next();
assertEquals(5, innerTuple.size());
//Check one field in innermost tuple
assertEquals("Hello\\u2026Hello", ((Tuple) innerTuple.get(2)).get(6));
}
}