blob: 22c202598cc92e5853edc251e796836d35670079 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.File;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.Properties;
import junit.framework.TestCase;
import org.junit.Test;
import org.apache.pig.FilterFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.EvalFunc;
import org.apache.pig.StoreFunc;
import org.apache.pig.builtin.*;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.ShellBagEvalFunc;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import static org.apache.pig.PigServer.ExecType.LOCAL;
public class TestBuiltin extends TestCase {
// Builtin MATH Functions
// =======================
@Test
public void testAVG() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
double expected = 5.5;
EvalFunc<DataAtom> avg = new AVG();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
DataAtom output = new DataAtom();
avg.exec(tup, output);
double actual = (new Double(output.strval())).doubleValue();
assertTrue(actual == expected);
}
@Test
public void testAVGInitial() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<Tuple> avg = new AVG.Initial();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
Tuple output = new Tuple();
avg.exec(tup, output);
assertEquals("Expected sum to be 55.0", 55.0,
output.getAtomField(0).numval());
assertEquals("Expected count to be 10", 10,
output.getAtomField(1).longVal());
}
@Test
public void testAVGFinal() throws Exception {
Tuple t1 = new Tuple(2);
t1.setField(0, 55.0);
t1.setField(1, 10);
Tuple t2 = new Tuple(2);
t2.setField(0, 28.0);
t2.setField(1, 7);
Tuple t3 = new Tuple(2);
t3.setField(0, 82.0);
t3.setField(1, 17);
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(t1);
bag.add(t2);
bag.add(t3);
Tuple tup = new Tuple(bag);
EvalFunc<DataAtom> avg = new AVG.Final();
DataAtom output = new DataAtom();
avg.exec(tup, output);
assertEquals("Expected average to be 4.852941176470588",
4.852941176470588, output.numval());
}
@Test
public void testCOUNT() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
double expected = input.length;
EvalFunc<DataAtom> count = new COUNT();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
DataAtom output = new DataAtom();
count.exec(tup, output);
double actual = (new Double(output.strval())).doubleValue();
assertTrue(actual == expected);
}
@Test
public void testCOUNTMap() throws Exception {
DataMap map = new DataMap();
Tuple tup = new Tuple();
tup.appendField(map);
DataAtom output = new DataAtom();
EvalFunc<DataAtom> count = new COUNT();
FilterFunc isEmpty = new IsEmpty();
assertTrue(isEmpty.exec(tup));
count.exec(tup,output);
assertTrue(output.numval() == 0);
map.put("a", new DataAtom("a"));
assertFalse(isEmpty.exec(tup));
count.exec(tup,output);
assertTrue(output.numval() == 1);
map.put("b", new Tuple());
assertFalse(isEmpty.exec(tup));
count.exec(tup,output);
assertTrue(output.numval() == 2);
}
@Test
public void testCOUNTInitial() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<Tuple> count = new COUNT.Initial();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
Tuple output = new Tuple();
count.exec(tup, output);
assertEquals("Expected count to be 10", 10,
output.getAtomField(0).longVal());
}
@Test
public void testCOUNTFinal() throws Exception {
int input[] = { 23, 38, 39 };
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
EvalFunc<DataAtom> count = new COUNT.Final();
DataAtom output = new DataAtom();
count.exec(tup, output);
assertEquals("Expected count to be 100", 100, output.longVal());
}
@Test
public void testSUM() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
double expected = 55;
EvalFunc<DataAtom> sum = new SUM();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
DataAtom output = new DataAtom();
sum.exec(tup, output);
double actual = (new Double(output.strval())).doubleValue();
assertTrue(actual == expected);
}
@Test
public void testSUMInitial() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<Tuple> sum = new SUM.Initial();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
Tuple output = new Tuple();
sum.exec(tup, output);
assertEquals("Expected sum to be 55.0", 55.0,
output.getAtomField(0).numval());
}
@Test
public void testSUMFinal() throws Exception {
int input[] = { 23, 38, 39 };
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
EvalFunc<DataAtom> sum = new SUM.Final();
DataAtom output = new DataAtom();
sum.exec(tup, output);
assertEquals("Expected sum to be 100.0", 100.0, output.numval());
}
@Test
public void testMIN() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<DataAtom> min = new MIN();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
DataAtom output = new DataAtom();
min.exec(tup, output);
assertEquals("Expected min to be 1.0", 1.0, output.numval());
}
@Test
public void testMINInitial() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<Tuple> min = new MIN.Initial();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
Tuple output = new Tuple();
min.exec(tup, output);
assertEquals("Expected min to be 1.0", 1.0,
output.getAtomField(0).numval());
}
@Test
public void testMINFinal() throws Exception {
int input[] = { 23, 38, 39 };
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
EvalFunc<DataAtom> min = new MIN.Final();
DataAtom output = new DataAtom();
min.exec(tup, output);
assertEquals("Expected sum to be 23.0", 23.0, output.numval());
}
@Test
public void testMAX() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<DataAtom> max = new MAX();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
DataAtom output = new DataAtom();
max.exec(tup, output);
assertEquals("Expected max to be 10.0", 10.0, output.numval());
}
@Test
public void testMAXInitial() throws Exception {
int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
EvalFunc<Tuple> max = new MAX.Initial();
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
Tuple output = new Tuple();
max.exec(tup, output);
assertEquals("Expected max to be 10.0", 10.0,
output.getAtomField(0).numval());
}
@Test
public void testMAXFinal() throws Exception {
int input[] = { 23, 38, 39 };
Tuple tup = Util.loadNestTuple(new Tuple(1), input);
EvalFunc<DataAtom> max = new MAX.Final();
DataAtom output = new DataAtom();
max.exec(tup, output);
assertEquals("Expected sum to be 39.0", 39.0, output.numval());
}
// Builtin APPLY Functions
// ========================
// Builtin LOAD Functions
// =======================
@Test
public void testLFPig() throws Exception {
String input1 = "this:is:delimited:by:a:colon\n";
int arity1 = 6;
LoadFunc p1 = new PigStorage(":");
FakeFSInputStream ffis1 = new FakeFSInputStream(input1.getBytes());
p1.bindTo(null, new BufferedPositionedInputStream(ffis1), 0, input1.getBytes().length);
Tuple f1 = p1.getNext();
assertTrue(f1.arity() == arity1);
LoadFunc p15 = new PigStorage();
StringBuilder sb = new StringBuilder();
int LOOP_COUNT = 1024;
for (int i = 0; i < LOOP_COUNT; i++) {
for (int j = 0; j < LOOP_COUNT; j++) {
sb.append(i);
sb.append("\t");
sb.append(i);
sb.append("\t");
sb.append(j % 2);
sb.append("\n");
}
}
byte bytes[] = sb.toString().getBytes();
FakeFSInputStream ffis15 = new FakeFSInputStream(bytes);
p15.bindTo(null, new BufferedPositionedInputStream(ffis15), 0, bytes.length);
int count = 0;
while (true) {
Tuple f15 = p15.getNext();
if (f15 == null)
break;
count++;
assertEquals(3, f15.arity());
}
assertEquals(LOOP_COUNT * LOOP_COUNT, count);
String input2 = ":this:has:a:leading:colon\n";
int arity2 = 6;
LoadFunc p2 = new PigStorage(":");
FakeFSInputStream ffis2 = new FakeFSInputStream(input2.getBytes());
p2.bindTo(null, new BufferedPositionedInputStream(ffis2), 0, input2.getBytes().length);
Tuple f2 = p2.getNext();
assertTrue(f2.arity() == arity2);
String input3 = "this:has:a:trailing:colon:\n";
int arity3 = 6;
LoadFunc p3 = new PigStorage(":");
FakeFSInputStream ffis3 = new FakeFSInputStream(input3.getBytes());
p3.bindTo(null, new BufferedPositionedInputStream(ffis3), 0, input1.getBytes().length);
Tuple f3 = p3.getNext();
assertTrue(f3.arity() == arity3);
}
/*
@Test
public void testLFBin() throws Exception {
BagFactory.init(new File("/tmp"));
Tuple t1 = new Tuple(4);
DataAtom a = new DataAtom("a");
DataAtom b = new DataAtom("b");
Tuple t2 = new Tuple(1);
t2.setField(0,a);
Tuple t3 = new Tuple(1);
t3.setField(0, b);
DataBag bag = BagFactory.getInstance().getNewBigBag();
bag.add(t2);
bag.add(t3);
Tuple t4 = new Tuple(2);
t4.setField(0, t2);
t4.setField(1, t3);
t1.setField(0, a);
t1.setField(1, t2);
t1.setField(2, bag);
t1.setField(3, t4);
Tuple t5 = new Tuple(4);
DataAtom c = new DataAtom("the quick brown fox");
DataAtom d = new DataAtom("jumps over the lazy dog");
Tuple t6 = new Tuple(1);
t6.setField(0,c);
Tuple t7 = new Tuple(1);
t7.setField(0, d);
DataBag bag2 = BagFactory.getInstance().getNewBigBag();
for(int i = 0; i < 10; i ++) {
bag2.add(t6);
bag2.add(t7);
}
Tuple t8 = new Tuple(2);
t8.setField(0, t6);
t8.setField(1, t7);
t5.setField(0, c);
t5.setField(1, t6);
t5.setField(2, bag2);
t5.setField(3, t8);
OutputStream os = new FileOutputStream("/tmp/bintest.bin");
StoreFunc s = new BinStorage();
s.bindTo(os);
s.putNext(t1);
s.putNext(t5);
s.finish();
LoadFunc l = new BinStorage();
InputStream is = FileLocalizer.open("/tmp/bintest.bin", new PigContext(ExecType.LOCAL));
l.bindTo("/tmp/bintest.bin", new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple r1 = l.getNext();
Tuple r2 = l.getNext();
assertTrue(r1.equals(t1));
assertTrue(r2.equals(t5));
}
*/
@Test
public void testLFText() throws Exception {
String input1 = "This is some text.\nWith a newline in it.\n";
String expected1 = "This is some text.";
String expected2 = "With a newline in it.";
FakeFSInputStream ffis1 = new FakeFSInputStream(input1.getBytes());
LoadFunc text1 = new TextLoader();
text1.bindTo(null, new BufferedPositionedInputStream(ffis1), 0, input1.getBytes().length);
Tuple f1 = text1.getNext();
Tuple f2 = text1.getNext();
assertTrue(expected1.equals(f1.getAtomField(0).strval()) && expected2.equals(f2.getAtomField(0).strval()));
String input2 = "";
FakeFSInputStream ffis2 = new FakeFSInputStream(input2.getBytes());
LoadFunc text2 = new TextLoader();
text2.bindTo(null, new BufferedPositionedInputStream(ffis2), 0, input2.getBytes().length);
Tuple f3 = text2.getNext();
assertTrue(f3 == null);
}
@Test
public void testSFPig() throws Exception {
byte[] buf = new byte[1024];
FakeFSOutputStream os = new FakeFSOutputStream(buf);
StoreFunc sfunc = new PigStorage("\t");
sfunc.bindTo(os);
int[] input = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
Tuple f1 = Util.loadFlatTuple(new Tuple(input.length), input);
sfunc.putNext(f1);
sfunc.finish();
FakeFSInputStream is = new FakeFSInputStream(buf);
LoadFunc lfunc = new PigStorage();
lfunc.bindTo(null, new BufferedPositionedInputStream(is), 0, buf.length);
Tuple f2 = lfunc.getNext();
assertTrue(f1.equals(f2));
}
@Test
public void testShellFuncSingle() throws Throwable {
//ShellBagEvalFunc func = new ShellBagEvalFunc("tr o 0");
PigServer pig = new PigServer(LOCAL, new Properties());
File tempFile = File.createTempFile("tmp", ".dat");
PrintWriter writer = new PrintWriter(tempFile);
writer.println("foo");
writer.println("boo");
writer.close();
pig.registerFunction("myTr",ShellBagEvalFunc.class.getName() + "('tr o 0')");
pig.registerQuery("a = load 'file:" + Util.encodeEscape(tempFile.toString()) + "';");
pig.registerQuery("b = foreach a generate myTr(*);");
Iterator<Tuple> iter = pig.openIterator("b");
Tuple t;
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("f00", t.getAtomField(0).strval());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("b00", t.getAtomField(0).strval());
assertFalse(iter.hasNext());
tempFile.delete();
}
@Test
public void testShellFuncMultiple() throws Throwable {
PigServer pig = new PigServer(LOCAL, new Properties());
final int numTimes = 100;
File tempFile = File.createTempFile("tmp", ".dat");
PrintWriter writer = new PrintWriter(tempFile);
for (int i=0; i< numTimes; i++){
writer.println(i+"oo");
}
writer.close();
pig.registerFunction("tr1",ShellBagEvalFunc.class.getName() + "('tr o A')");
pig.registerFunction("tr2",ShellBagEvalFunc.class.getName() + "('tr o B')");
pig.registerQuery("a = load 'file:" + Util.encodeEscape(tempFile.toString()) + "';");
pig.registerQuery("b = foreach a generate tr1(*),tr2(*);");
Iterator<Tuple> iter = pig.openIterator("b");
for (int i=0; i< numTimes; i++){
Tuple t = iter.next();
assertEquals(i+"AA", t.getBagField(0).iterator().next().getAtomField(0).strval());
assertEquals(i+"BB", t.getBagField(1).iterator().next().getAtomField(0).strval());
}
assertFalse(iter.hasNext());
tempFile.delete();
}
}