blob: 6b0cb4de36088f30eb5b841a462509bf7b4e48a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Iterator;
import java.util.Properties;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.EvalFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.builtin.COUNT;
import org.apache.pig.data.DataAtom;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
import org.junit.Before;
import org.junit.Test;
public class TestMapReduce extends TestCase {
private Log log = LogFactory.getLog(getClass());
MiniCluster cluster = MiniCluster.buildCluster();
private PigServer pig;
@Before
@Override
protected void setUp() throws Exception {
pig = new PigServer(MAPREDUCE, cluster.getProperties());
}
@Test
public void testBigGroupAll() throws Throwable {
int LOOP_COUNT = 4*1024;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
ps.println(i);
}
ps.close();
String query = "foreach (group (load 'file:" + Util.encodeEscape(tmpFile.toString()) + "') all) generate " + COUNT.class.getName() + "($1) ;";
System.out.println(query);
pig.registerQuery("asdf_id = " + query);
Iterator it = pig.openIterator("asdf_id");
tmpFile.delete();
Tuple t = (Tuple)it.next();
Double count = t.getAtomField(0).numval();
assertEquals(count, (double)LOOP_COUNT);
}
static public class MyApply extends EvalFunc<DataBag> {
String field0 = "Got";
public MyApply() {}
public MyApply(String field0) {
this.field0 = field0;
}
@Override
public void exec(Tuple input, DataBag output) throws IOException {
Iterator<Tuple> it = (input.getBagField(0)).iterator();
while(it.hasNext()) {
Tuple t = it.next();
Tuple newT = new Tuple(2);
newT.setField(0, field0);
newT.setField(1, t.getField(0).toString());
output.add(newT);
}
}
}
static public class MyGroup extends EvalFunc<Tuple> {
@Override
public void exec(Tuple input, Tuple output) throws IOException{
output.appendField(new DataAtom("g"));
}
}
static public class MyStorage implements LoadFunc, StoreFunc {
final static int COUNT = 10;
int count = 0;
public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
}
public Tuple getNext() throws IOException {
if (count < COUNT) {
Tuple t = new Tuple(Integer.toString(count++));
return t;
}
return null;
}
OutputStream os;
public void bindTo(OutputStream os) throws IOException {
this.os = os;
}
public void finish() throws IOException {
}
public void putNext(Tuple f) throws IOException {
os.write((f.toDelimitedString("-")+"\n").getBytes());
}
}
@Test
public void testStoreFunction() throws Throwable {
File tmpFile = File.createTempFile("test", ".txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < 10; i++) {
ps.println(i+"\t"+i);
}
ps.close();
String query = "foreach (load 'file:"+Util.encodeEscape(tmpFile.toString())+"') generate $0,$1;";
System.out.println(query);
pig.registerQuery("asdf_id = " + query);
try {
pig.deleteFile("frog");
} catch(Exception e) {}
pig.store("asdf_id", "frog", MyStorage.class.getName()+"()");
InputStream is = FileLocalizer.open("frog", pig.getPigContext());
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line;
int i = 0;
while((line = br.readLine()) != null) {
assertEquals(line, Integer.toString(i) + '-' + Integer.toString(i));
i++;
}
br.close();
pig.deleteFile("frog");
}
@Test
public void testQualifiedFuncions() throws Throwable {
File tmpFile = File.createTempFile("test", ".txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < 1; i++) {
ps.println(i);
}
ps.close();
String query = "foreach (group (load 'file:"+Util.encodeEscape(tmpFile.toString())+"' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(" + MyApply.class.getName() + "($1)) ;";
System.out.println(query);
pig.registerQuery("asdf_id = " + query);
Iterator it = pig.openIterator("asdf_id");
tmpFile.delete();
Tuple t;
int count = 0;
while(it.hasNext()) {
t = (Tuple) it.next();
assertEquals(t.getField(0).toString(), "Got");
Integer.parseInt(t.getField(1).toString());
count++;
}
assertEquals(count, MyStorage.COUNT);
}
@Test
public void testDefinedFunctions() throws Throwable {
File tmpFile = File.createTempFile("test", ".txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < 1; i++) {
ps.println(i);
}
ps.close();
pig.registerFunction("foo", MyApply.class.getName()+"('foo')");
String query = "foreach (group (load 'file:"+Util.encodeEscape(tmpFile.toString())+"' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(foo($1)) ;";
System.out.println(query);
pig.registerQuery("asdf_id = " + query);
Iterator it = pig.openIterator("asdf_id");
tmpFile.delete();
Tuple t;
int count = 0;
while(it.hasNext()) {
t = (Tuple) it.next();
assertEquals("foo", t.getField(0).toString());
Integer.parseInt(t.getField(1).toString());
count++;
}
assertEquals(count, MyStorage.COUNT);
}
@Test
public void testPigServer() throws Throwable {
log.debug("creating pig server");
PigContext pigContext = new PigContext(MAPREDUCE, cluster.getProperties());
PigServer pig = new PigServer(pigContext);
System.out.println("testing capacity");
long capacity = pig.capacity();
assertTrue(capacity > 0);
String sampleFileName = "/tmp/fileTest";
if (!pig.existsFile(sampleFileName)) {
ElementDescriptor path = pigContext.getDfs().asElement(sampleFileName);
OutputStream os = path.create();
os.write("Ben was here!".getBytes());
os.close();
}
long length = pig.fileSize(sampleFileName);
assertTrue(length > 0);
}
}