blob: 17e2613aa192da13ae25bbd210d2dfe2dac24641 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.junit.Before;
import org.junit.Test;
/**
*
*/
public class TestDataBagAccess {
private PigServer pigServer;
@Before
public void setUp() throws Exception{
pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
}
@Test
public void testSingleTupleBagAcess() throws Exception {
Tuple inputTuple = new DefaultTuple();
inputTuple.append("a");
inputTuple.append("b");
SingleTupleBag bg = new SingleTupleBag(inputTuple);
Iterator<Tuple> it = bg.iterator();
assertEquals(inputTuple, it.next());
assertFalse(it.hasNext());
}
@Test
public void testNonSpillableDataBag() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
NonSpillableDataBag bg = new NonSpillableDataBag();
for (int i = 0; i < tupleContents.length; i++) {
bg.add(Util.createTuple(tupleContents[i]));
}
Iterator<Tuple> it = bg.iterator();
int j = 0;
while(it.hasNext()) {
Tuple t = it.next();
assertEquals(Util.createTuple(tupleContents[j]), t);
j++;
}
assertEquals(tupleContents.length, j);
}
@Test
public void testBagConstantAccess() throws IOException, ExecException {
File input = Util.createInputFile("tmp", "",
new String[] {"sampledata\tnot_used"});
pigServer.registerQuery("a = load '"
+ Util.generateURI(input.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("b = foreach a generate {(16, 4.0e-2, 'hello', -101)} as mybag:{t:(i: int, d: double, c: chararray, e : int)};");
pigServer.registerQuery("c = foreach b generate mybag.i, mybag.d, mybag.c, mybag.e;");
Iterator<Tuple> it = pigServer.openIterator("c");
Tuple t = it.next();
Object[] results = new Object[] { new Integer(16), new Double(4.0e-2), "hello", new Integer( -101 ) };
Class[] resultClasses = new Class[] { Integer.class, Double.class, String.class, Integer.class };
assertEquals(results.length, t.size());
for (int i = 0; i < results.length; i++) {
DataBag bag = (DataBag)t.get(i);
assertEquals(results[i], bag.iterator().next().get(0));
assertEquals(resultClasses[i], bag.iterator().next().get(0).getClass());
}
}
@Test
public void testBagConstantAccessFailure() throws IOException, ExecException {
File input = Util.createInputFile("tmp", "",
new String[] {"sampledata\tnot_used"});
boolean exceptionOccured = false;
pigServer.setValidateEachStatement(true);
try {
pigServer.registerQuery("a = load '"
+ Util.generateURI(input.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("b = foreach a generate {(16, 4.0e-2, 'hello')} as mybag:{t:(i: int, d: double, c: chararray)};");
pigServer.registerQuery("c = foreach b generate mybag.t;");
pigServer.explain("c", System.out);
} catch(FrontendException e) {
exceptionOccured = true;
String msg = e.getMessage();
Util.checkStrContainsSubStr(msg, "Cannot find field t in i:int,d:double,c:chararray");
}
assertTrue(exceptionOccured);
}
@Test
public void testBagConstantFlatten1() throws IOException, ExecException {
File input = Util.createInputFile("tmp", "",
new String[] {"sampledata\tnot_used"});
pigServer.registerQuery("A = load '"
+ Util.generateURI(input.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate {(('p1-t1-e1', 'p1-t1-e2'),('p1-t2-e1', 'p1-t2-e2'))," +
"(('p2-t1-e1', 'p2-t1-e2'), ('p2-t2-e1', 'p2-t2-e2'))};");
pigServer.registerQuery("C = foreach B generate $0 as pairbag : { pair: ( t1: (e1, e2), t2: (e1, e2) ) };");
pigServer.registerQuery("D = foreach C generate FLATTEN(pairbag);");
pigServer.registerQuery("E = foreach D generate t1.e2 as t1e2, t2.e1 as t2e1;");
Iterator<Tuple> it = pigServer.openIterator("E");
// We should get the following two tuples as the result:
// (p1-t1-e2,p1-t2-e1)
// (p2-t1-e2,p2-t2-e1)
Tuple t = it.next();
assertEquals("p1-t1-e2", t.get(0));
assertEquals("p1-t2-e1", t.get(1));
t = it.next();
assertEquals("p2-t1-e2", t.get(0));
assertEquals("p2-t2-e1", t.get(1));
assertFalse(it.hasNext());
}
@Test
public void testBagConstantFlatten2() throws IOException, ExecException {
File input = Util.createInputFile("tmp", "",
new String[] {"somestring\t10\t{(a,10),(b,20)}"});
pigServer.registerQuery("a = load '"
+ Util.generateURI(input.toString(), pigServer.getPigContext())
+ "' " + "as (str:chararray, intval:int, bg:bag{t:tuple(s:chararray, i:int)});");
pigServer.registerQuery("b = foreach a generate str, intval, flatten(bg);");
pigServer.registerQuery("c = foreach b generate str, intval, s, i;");
Iterator<Tuple> it = pigServer.openIterator("c");
int i = 0;
Object[][] results = new Object[][] { {"somestring", new Integer(10), "a", new Integer(10)},
{"somestring", new Integer(10), "b", new Integer(20) }};
Class[] resultClasses = new Class[] { String.class, Integer.class, String.class, Integer.class };
while(it.hasNext()) {
Tuple t = it.next();
for (int j = 0; j < resultClasses.length; j++) {
assertEquals(results[i][j], t.get(j));
assertEquals(resultClasses[j], t.get(j).getClass());
}
i++;
}
assertEquals(results.length, i);
pigServer.registerQuery("c = foreach b generate str, intval, bg::s, bg::i;");
it = pigServer.openIterator("c");
i = 0;
while(it.hasNext()) {
Tuple t = it.next();
for (int j = 0; j < resultClasses.length; j++) {
assertEquals(results[i][j], t.get(j));
assertEquals(resultClasses[j], t.get(j).getClass());
}
i++;
}
assertEquals(results.length, i);
}
@Test
public void testBagStoreLoad() throws IOException, ExecException {
File input = Util.createInputFile("tmp", "",
new String[] {"a\tid1", "a\tid2", "a\tid3", "b\tid4", "b\tid5", "b\tid6"});
pigServer.registerQuery("a = load '"
+ Util.generateURI(input.toString(), pigServer.getPigContext())
+ "' " + "as (s:chararray, id:chararray);");
pigServer.registerQuery("b = group a by s;");
Class[] loadStoreClasses = new Class[] { BinStorage.class, PigStorage.class };
for (int i = 0; i < loadStoreClasses.length; i++) {
String output = "TestDataBagAccess-testBagStoreLoad-" +
loadStoreClasses[i].getName() + ".txt";
pigServer.deleteFile(output);
pigServer.store("b", output, loadStoreClasses[i].getName());
pigServer.registerQuery("c = load '" + output + "' using " + loadStoreClasses[i].getName() + "() AS " +
"(gp: chararray, bg:bag { t: tuple (sReLoaded: chararray, idReLoaded: chararray)});;");
Iterator<Tuple> it = pigServer.openIterator("c");
MultiMap<Object, Object> results = new MultiMap<Object, Object>();
results.put("a", "id1");
results.put("a", "id2");
results.put("a", "id3");
results.put("b", "id4");
results.put("b", "id5");
results.put("b", "id6");
int j = 0;
while(it.hasNext()) {
Tuple t = it.next();
Object groupKey = t.get(0);
DataBag groupBag = (DataBag)t.get(1);
Iterator<Tuple> bgIt = groupBag.iterator();
int k = 0;
while(bgIt.hasNext()) {
// a hash to make sure we don't see the
// same "ids" twice
HashMap<Object, Boolean> seen = new HashMap<Object, Boolean>();
Tuple bgt = bgIt.next();
// the first col is the group by key
assertTrue(bgt.get(0).equals(groupKey));
Collection<Object> values = results.get(groupKey);
// check that the second column is one
// of the "id" values associated with this
// group by key
assertTrue(values.contains(bgt.get(1)));
// check that we have not seen the same "id" value
// before
if(seen.containsKey(bgt.get(1)))
fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
", duplicate value (" + bgt.get(1) + ")");
else
seen.put(bgt.get(1), true);
k++;
}
// check that we saw 3 tuples in each group bag
assertEquals(3, k);
j++;
}
// make sure we saw the right number of high
// level tuples
assertEquals(results.keySet().size(), j);
pigServer.registerQuery("d = foreach c generate gp, flatten(bg);");
// results should be
// a a id1
// a a id2
// a a id3
// b b id4
// b b id5
// b b id6
// However order is not guaranteed
List<Tuple> resultTuples = new ArrayList<Tuple>();
resultTuples.add(Util.createTuple(new String[] { "a", "a", "id1"}));
resultTuples.add(Util.createTuple(new String[] { "a", "a", "id2"}));
resultTuples.add(Util.createTuple(new String[] { "a", "a", "id3"}));
resultTuples.add(Util.createTuple(new String[] { "b", "b", "id4"}));
resultTuples.add(Util.createTuple(new String[] { "b", "b", "id5"}));
resultTuples.add(Util.createTuple(new String[] { "b", "b", "id6"}));
it = pigServer.openIterator("d");
j = 0;
HashMap<Tuple, Boolean> seen = new HashMap<Tuple, Boolean>();
while(it.hasNext()) {
Tuple t = it.next();
assertTrue(resultTuples.contains(t));
if(seen.containsKey(t)) {
fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
", duplicate tuple (" + t + ") encountered.");
} else {
seen.put(t, true);
}
j++;
}
// check we got expected number of tuples
assertEquals(resultTuples.size(), j);
// same output as above - but projection based on aliases
pigServer.registerQuery("e = foreach d generate gp, sReLoaded, idReLoaded;");
it = pigServer.openIterator("e");
j = 0;
seen = new HashMap<Tuple, Boolean>();
while(it.hasNext()) {
Tuple t = it.next();
assertTrue(resultTuples.contains(t));
if(seen.containsKey(t)) {
fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
", duplicate tuple (" + t + ") encountered.");
} else {
seen.put(t, true);
}
j++;
}
// check we got expected number of tuples
assertEquals(resultTuples.size(), j);
// same result as above but projection based on position specifiers
pigServer.registerQuery("f = foreach d generate $0, $1, $2;");
it = pigServer.openIterator("f");
j = 0;
seen = new HashMap<Tuple, Boolean>();
while(it.hasNext()) {
Tuple t = it.next();
assertTrue(resultTuples.contains(t));
if(seen.containsKey(t)) {
fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
", duplicate tuple (" + t + ") encountered.");
} else {
seen.put(t, true);
}
j++;
}
// check we got expected number of tuples
assertEquals(resultTuples.size(), j);
}
}
}