blob: 151cc1e864581a2cffebd771bcd36caae218a103 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
import org.junit.Test;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigServer;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.TextLoader;
import org.apache.pig.data.*;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.PigFile;
import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
import org.apache.pig.backend.executionengine.ExecException;
import junit.framework.TestCase;
public class TestEvalPipeline extends PigExecTestCase {
static public class MyBagFunction extends EvalFunc<DataBag>{
@Override
public void exec(Tuple input, DataBag output) throws IOException {
output.add(new Tuple("a"));
output.add(new Tuple("a"));
output.add(new Tuple("a"));
}
}
private File createFile(String[] data) throws Exception{
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
for (int i=0; i<data.length; i++){
pw.println(data[i]);
}
pw.close();
return f;
}
@Test
public void testFunctionInsideFunction() throws Throwable {
File f1 = createFile(new String[]{"a:1","b:1","a:1"});
pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(f1.toString()) + "' using " + PigStorage.class.getName() + "(':');");
pigServer.registerQuery("b = foreach a generate '1'-'1'/'1';");
Iterator<Tuple> iter = pigServer.openIterator("b");
for (int i=0 ;i<3; i++){
assertEquals(iter.next().getAtomField(0).numval(), 0.0);
}
}
@Test
public void testJoin() throws Throwable {
File f1 = createFile(new String[]{"a:1","b:1","a:1"});
File f2 = createFile(new String[]{"b","b","a"});
pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(f1.toString()) + "' using " + PigStorage.class.getName() + "(':');");
pigServer.registerQuery("b = load 'file:" + Util.encodeEscape(f2.toString()) + "';");
pigServer.registerQuery("c = cogroup a by $0, b by $0;");
pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
Iterator<Tuple> iter = pigServer.openIterator("d");
int count = 0;
while(iter.hasNext()){
Tuple t = iter.next();
assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
count++;
}
assertEquals(count, 4);
}
@Test
public void testDriverMethod() throws Throwable {
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
pw.println("a");
pw.println("a");
pw.close();
pigServer.registerQuery("a = foreach (load 'file:" + Util.encodeEscape(f.toString()) + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
Iterator<Tuple> iter = pigServer.openIterator("a");
int count = 0;
while(iter.hasNext()){
Tuple t = iter.next();
assertTrue(t.getAtomField(0).strval().equals("1"));
assertTrue(t.getAtomField(1).strval().equals("a"));
count++;
}
assertEquals(count, 6);
f.delete();
}
@Test
public void testMapLookup() throws Throwable {
DataBag b = BagFactory.getInstance().newDefaultBag();
DataMap colors = new DataMap();
colors.put("apple","red");
colors.put("orange","orange");
DataMap weights = new DataMap();
weights.put("apple","0.1");
weights.put("orange","0.3");
Tuple t = new Tuple();
t.appendField(colors);
t.appendField(weights);
b.add(t);
String fileName = "file:"+File.createTempFile("tmp", "");
PigFile f = new PigFile(fileName);
f.store(b, new BinStorage(), pigServer.getPigContext());
pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName.toString()) + "' using BinStorage();");
pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
Iterator<Tuple> iter = pigServer.openIterator("b");
t = iter.next();
assertEquals(t.getAtomField(0).strval(), "red");
assertEquals(t.getAtomField(1).numval(), 0.3);
assertFalse(iter.hasNext());
}
static public class TitleNGrams extends EvalFunc<DataBag> {
@Override
public void exec(Tuple input, DataBag output) throws IOException {
String str = input.getAtomField(0).strval();
String title = str;
if (title != null) {
List<String> nGrams = makeNGrams(title);
for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
Tuple t = new Tuple(1);
t.setField(0, it.next());
output.add(t);
}
}
}
List<String> makeNGrams(String str) {
List<String> tokens = new ArrayList<String>();
StringTokenizer st = new StringTokenizer(str);
while (st.hasMoreTokens())
tokens.add(st.nextToken());
return nGramHelper(tokens, new ArrayList<String>());
}
ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
if (str.size() == 0)
return nGrams;
for (int i = 0; i < str.size(); i++)
nGrams.add(makeString(str.subList(0, i+1)));
return nGramHelper(str.subList(1, str.size()), nGrams);
}
String makeString(List<String> list) {
StringBuilder sb = new StringBuilder();
for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
sb.append(it.next());
if (it.hasNext())
sb.append(" ");
}
return sb.toString();
}
}
@Test
public void testBagFunctionWithFlattening() throws Throwable {
File queryLogFile = createFile(
new String[]{
"stanford\tdeer\tsighting",
"bush\tpresident",
"stanford\tbush",
"conference\tyahoo",
"world\tcup\tcricket",
"bush\twins",
"stanford\tpresident",
}
);
File newsFile = createFile(
new String[]{
"deer seen at stanford",
"george bush visits stanford",
"yahoo hosting a conference in the bay area",
"who will win the world cup"
}
);
Map<String, Integer> expectedResults = new HashMap<String, Integer>();
expectedResults.put("bush", 2);
expectedResults.put("stanford", 3);
expectedResults.put("world", 1);
expectedResults.put("conference", 1);
pigServer.registerQuery("newsArticles = LOAD 'file:" + Util.encodeEscape(newsFile.toString()) + "' USING " + TextLoader.class.getName() + "();");
pigServer.registerQuery("queryLog = LOAD 'file:" + Util.encodeEscape(queryLogFile.toString()) + "';");
pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
Iterator<Tuple> iter = pigServer.openIterator("answer");
while(iter.hasNext()){
Tuple t = iter.next();
assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
}
}
@Test
public void testSort() throws Throwable {
testSortDistinct(false);
}
@Test
public void testDistinct() throws Throwable {
testSortDistinct(true);
}
private void testSortDistinct(boolean eliminateDuplicates) throws Throwable {
int LOOP_SIZE = 1024*16;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
Random r = new Random();
for(int i = 0; i < LOOP_SIZE; i++) {
ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
}
ps.close();
String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD 'file:" + Util.encodeEscape(tmpFile.toString()) + "';");
if (eliminateDuplicates){
pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
}else{
pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
}
pigServer.store("B", tmpOutputFile);
pigServer.registerQuery("A = load '" + Util.encodeEscape(tmpOutputFile.toString()) + "';");
Iterator<Tuple> iter = pigServer.openIterator("A");
int last = -1;
while (iter.hasNext()){
Tuple t = iter.next();
if (eliminateDuplicates){
assertTrue(last < t.getAtomField(0).numval().intValue());
}else{
assertTrue(last <= t.getAtomField(0).numval().intValue());
assertEquals(t.arity(), 2);
}
}
}
}