blob: eaf08d15ba34cef2fb64d3ee5eae79bfe47c787e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.*;
import java.util.Iterator;
import java.util.Map;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class TestSkewedJoin extends TestCase{
private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
private static final String INPUT_FILE2 = "SkewedJoinInput2.txt";
private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
private PigServer pigServer;
private static MiniCluster cluster = MiniCluster.buildCluster();
public TestSkewedJoin() throws ExecException, IOException{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
// pigServer = new PigServer(ExecType.LOCAL);
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "5");
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", "0.01");
}
@Before
public void setUp() throws Exception {
createFiles();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
private void createFiles() throws IOException {
PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
int k = 0;
for(int j=0; j<120; j++) {
w.println("100\tapple1\taaa" + k);
k++;
w.println("200\torange1\tbbb" + k);
k++;
w.println("300\tstrawberry\tccc" + k);
k++;
}
w.close();
PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE2));
w2.println("100\tapple1");
w2.println("100\tapple2");
w2.println("100\tapple2");
w2.println("200\torange1");
w2.println("200\torange2");
w2.println("300\tstrawberry");
w2.println("400\tpear");
w2.close();
PrintWriter w3 = new PrintWriter(new FileWriter(INPUT_FILE3));
w3.println("100\tapple1");
w3.println("100\tapple2");
w3.println("200\torange1");
w3.println("200\torange2");
w3.println("300\tstrawberry");
w3.println("300\tstrawberry2");
w3.println("400\tpear");
w3.close();
PrintWriter w4 = new PrintWriter(new FileWriter(INPUT_FILE4));
for(int i=0; i < 100; i++) {
w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
}
w4.close();
// Create a file with null keys
PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_FILE5));
for(int i=0; i < 10; i++) {
w5.println("\tapple1");
}
w5.println("100\tapple2");
for(int i=0; i < 10; i++) {
w5.println("\torange1");
}
w5.println("\t");
w5.println("100\t");
w5.close();
PrintWriter w6 = new PrintWriter(new FileWriter(INPUT_FILE6));
for(int i=0; i<300; i++) {
for(int j=0; j<5; j++) {
w6.println(""+i+"\t"+j);
}
}
w6.close();
PrintWriter w7 = new PrintWriter(new FileWriter(INPUT_FILE7));
for(int i=0; i<300; i = i+3) {
for(int j=0; j<2; j++) {
w7.println(""+i+"\t"+j);
}
}
w7.close();
Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
Util.copyFromLocalToCluster(cluster, INPUT_FILE6, INPUT_FILE6);
Util.copyFromLocalToCluster(cluster, INPUT_FILE7, INPUT_FILE7);
}
@After
public void tearDown() throws Exception {
new File(INPUT_FILE1).delete();
new File(INPUT_FILE2).delete();
new File(INPUT_FILE3).delete();
new File(INPUT_FILE4).delete();
new File(INPUT_FILE5).delete();
new File(INPUT_FILE6).delete();
new File(INPUT_FILE7).delete();
Util.deleteDirectory(new File("skewedjoin"));
Util.deleteFile(cluster, INPUT_FILE1);
Util.deleteFile(cluster, INPUT_FILE2);
Util.deleteFile(cluster, INPUT_FILE3);
Util.deleteFile(cluster, INPUT_FILE4);
Util.deleteFile(cluster, INPUT_FILE5);
Util.deleteFile(cluster, INPUT_FILE6);
Util.deleteFile(cluster, INPUT_FILE7);
}
@Test
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
pigServer.registerQuery("C = GROUP A by id;");
pigServer.registerQuery("D = GROUP B by id;");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("E = join C by group, D by group using 'skewed' parallel 5;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
{
pigServer.registerQuery("E = join C by group, D by group;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbshj.add(iter.next());
}
}
Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
@Test
public void testSkewedJoinWithNoProperties() throws IOException{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
DataBag dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by (id, name), B by (id, name) using 'skewed' parallel 5;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
{
pigServer.registerQuery("E = join A by(id, name), B by (id, name);");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbshj.add(iter.next());
}
}
Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}catch(Exception e) {
fail(e.getMessage());
}
}
@Test
public void testSkewedJoinReducers() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by id, B by id using 'skewed' parallel 1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
}catch(Exception e) {
fail("Should not throw exception, should continue execution");
}
}
@Test
public void testSkewedJoin3Way() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
pigServer.registerQuery("C = LOAD '" + INPUT_FILE3 + "' as (id, name);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("D = join A by id, B by id, C by id using 'skewed' parallel 5;");
Iterator<Tuple> iter = pigServer.openIterator("D");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
}catch(Exception e) {
return;
}
fail("Should throw exception, do not support 3 way join");
}
@Test
public void testSkewedJoinMapKey() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE4 + "' as (m:[]);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE4 + "' as (n:[]);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by (chararray)m#'a100', B by (chararray)n#'a100' using 'skewed' parallel 20;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
}catch(Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
fail("Should support maps and expression operators as keys");
}
return;
}
@Test
public void testSkewedJoinKeyPartition() throws IOException {
try{
Util.deleteFile(cluster, "skewedjoin");
}catch(Exception e){
// it is ok if directory not exist
}
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
pigServer.registerQuery("E = join A by id, B by id using 'skewed' parallel 7;");
pigServer.store("E", "skewedjoin");
int[][] lineCount = new int[3][7];
new File("skewedjoin").mkdir();
// check how many times a key appear in each part- file
for(int i=0; i<7; i++) {
Util.copyFromClusterToLocal(cluster, "skewedjoin/part-r-0000"+i, "skewedjoin/part-r-0000"+i);
BufferedReader reader = new BufferedReader(new FileReader("skewedjoin/part-r-0000"+i));
String line = null;
while((line = reader.readLine()) != null) {
String[] cols = line.split("\t");
int key = Integer.parseInt(cols[0])/100 -1;
lineCount[key][i] ++;
}
}
int fc = 0;
for(int i=0; i<3; i++) {
for(int j=0; j<7; j++) {
if (lineCount[i][j] > 0) {
fc ++;
}
}
}
// atleast one key should be a skewed key
// check atleast one key should appear in more than 1 part- file
assertTrue(fc > 3);
}
@Test
public void testSkewedJoinNullKeys() throws IOException {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by id, B by id using 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
} catch(Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
fail("Should support null keys in skewed join");
}
return;
}
@Test
public void testSkewedJoinOuter() throws IOException {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by id left, B by id using 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
{
pigServer.registerQuery("C = join A by id right, B by id using 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
{
pigServer.registerQuery("C = join A by id full, B by id using 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
} catch(Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
fail("Should support outer join in skewed join");
}
return;
}
// pig 1048
@Test
public void testSkewedJoinOneValue() throws IOException {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (id,name);");
// Filter key with a single value
pigServer.registerQuery("C = FILTER A by id == 400;");
pigServer.registerQuery("D = FILTER B by id == 400;");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("E = join C by id, D by id using 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
{
pigServer.registerQuery("E = join C by id, D by id;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbrj.add(iter.next());
}
}
Assert.assertEquals(dbfrj.size(), dbrj.size());
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
}
@Test
public void testSkewedJoinManyReducers() throws IOException {
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "2");
pigServer.registerQuery("A = LOAD '" + INPUT_FILE6 + "' as (id,name);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE7 + "' as (id,name);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("E = join A by id, B by id using 'skewed' parallel 300;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
}
{
pigServer.registerQuery("E = join A by id, B by id;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while(iter.hasNext()) {
dbrj.add(iter.next());
}
}
Assert.assertEquals(dbfrj.size(), dbrj.size());
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
}
@Test
public void testSkewedJoinEmptyInput() throws IOException {
String LEFT_INPUT_FILE = "left.dat";
String RIGHT_INPUT_FILE = "right.dat";
PrintWriter w = new PrintWriter(new FileWriter(LEFT_INPUT_FILE));
w.println("1");
w.println("2");
w.println("3");
w.println("5");
w.close();
Util.copyFromLocalToCluster(cluster, LEFT_INPUT_FILE, LEFT_INPUT_FILE);
PrintWriter w2 = new PrintWriter(new FileWriter(RIGHT_INPUT_FILE));
w2.println("1\tone");
w2.println("2\ttwo");
w2.println("3\tthree");
w2.close();
Util.copyFromLocalToCluster(cluster, RIGHT_INPUT_FILE, RIGHT_INPUT_FILE);
pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);");
pigServer.registerQuery("b = load 'right.dat' as (number:chararray,text:chararray);");
pigServer.registerQuery("c = filter a by nums == '7';");
pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("d");
Assert.assertFalse(iter.hasNext());
new File(LEFT_INPUT_FILE).delete();
Util.deleteFile(cluster, LEFT_INPUT_FILE);
new File(RIGHT_INPUT_FILE).delete();
Util.deleteFile(cluster, RIGHT_INPUT_FILE);
}
@Test
public void testRecursiveFileListing() throws IOException {
String LOCAL_INPUT_FILE = "test.dat";
String INPUT_FILE = "foo/bar/test.dat";
PrintWriter w = new PrintWriter(new FileWriter(LOCAL_INPUT_FILE));
w.println("1");
w.println("2");
w.println("3");
w.println("5");
w.close();
Util.copyFromLocalToCluster(cluster, LOCAL_INPUT_FILE, INPUT_FILE);
pigServer.registerQuery("a = load 'foo' as (nums:chararray);");
pigServer.registerQuery("b = load 'foo' as (nums:chararray);");
pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed';");
Iterator<Tuple> iter = pigServer.openIterator("d");
int count = 0;
while (iter.hasNext()) {
iter.next();
count++;
}
Assert.assertEquals(4, count);
new File(LOCAL_INPUT_FILE).delete();
Util.deleteFile(cluster, INPUT_FILE);
}
@Test
public void testSkewedJoinUDF() throws IOException {
PartitionSkewedKeys udf = new PartitionSkewedKeys(new String[]{"0.1", "2", "1.txt"});
Tuple t = TupleFactory.getInstance().newTuple();
t.append(3); // use 3 reducers
DataBag db = new DefaultDataBag();
Tuple sample;
for (int i=0;i<=3;i++) {
sample = TupleFactory.getInstance().newTuple();
if (i!=3)
sample.append("1");
else
sample.append("2");
sample.append((long)200);
if (i!=3)
sample.append((long)0);
else
sample.append((long)30);
db.add(sample);
}
t.append(db);
Map<String, Object> output = udf.exec(t);
DataBag parList = (DataBag)output.get(PartitionSkewedKeys.PARTITION_LIST);
for (Tuple par : parList) {
if (par.get(0).equals("1")) {
par.get(1).equals(0);
par.get(2).equals(2);
}
}
}
}