blob: 8c6051fa7692bf12ff1ed58f050404b28b7f0489 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
abstract public class TestJoinBase {
protected PigServer pigServer;
TupleFactory mTf = TupleFactory.getInstance();
BagFactory mBf = BagFactory.getInstance();
abstract protected String createInputFile(String fileNameHint, String[] data) throws IOException;
abstract protected void deleteInputFile(String fileName) throws Exception;
@Test
public void testJoinUnkownSchema() throws Exception {
// If any of the input schema is unknown, the resulting schema should be unknown as well
String script = "a = load 'a.txt';" +
"b = load 'b.txt'; " +
"c = join a by $0, b by $0;";
Util.registerMultiLineQuery(pigServer, script);
Schema schema = pigServer.dumpSchema("c");
assertNull(schema);
}
@Test
public void testDefaultJoin() throws Exception {
String[] input1 = {
"hello\t1",
"bye\t2",
"\t3"
};
String[] input2 = {
"hello\tworld",
"good\tmorning",
"\tevening"
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
Tuple expectedResult = (Tuple)Util.getPigConstant("('hello',1,'hello','world')");
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (n:chararray, a:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (n:chararray, m:chararray); " +
"c = join a by $0, b by $0;";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
assertTrue(it.hasNext());
assertEquals(expectedResult, it.next());
assertFalse(it.hasNext());
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) + "'; " +
"b = load '" + Util.encodeEscape(secondInput) + "'; " +
"c = join a by $0, b by $0;";
Util.registerMultiLineQuery(pigServer, script);
it = pigServer.openIterator("c");
assertTrue(it.hasNext());
assertEquals(expectedResult.toString(), it.next().toString());
assertFalse(it.hasNext());
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
@Test
public void testJoinSchema() throws Exception {
String[] input1 = {
"1\t2",
"2\t3",
"3\t4"
};
String[] input2 = {
"1\thello",
"4\tbye",
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
Tuple expectedResult = (Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (k:int, l:chararray); " +
"c = join a by $0, b by $0;" +
"d = foreach c generate i,j,k,l,a::i as ai,a::j as aj,b::k as bk,b::l as bl;";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("d");
assertTrue(it.hasNext());
assertEquals(expectedResult, it.next());
assertFalse(it.hasNext());
// schema with duplicates
script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int, l:chararray); " +
"c = join a by $0, b by $0;" +
"d = foreach c generate i,j,l,a::i as ai,a::j as aj,b::i as bi,b::l as bl;";
boolean exceptionThrown = false;
try{
Util.registerMultiLineQuery(pigServer, script);
pigServer.openIterator("d");
}catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
assertEquals(1025, pe.getErrorCode());
exceptionThrown = true;
}
assertTrue(exceptionThrown);
// schema with duplicates with resolution
script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int, j:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int, l:chararray); " +
"c = join a by $0, b by $0;" +
"d = foreach c generate a::i as ai1,j,b::i as bi1,l,a::i as ai2,a::j as aj2,b::i as bi3,b::l as bl3;";
Util.registerMultiLineQuery(pigServer, script);
it = pigServer.openIterator("d");
assertTrue(it.hasNext());
assertEquals(expectedResult, it.next());
assertFalse(it.hasNext());
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
@Test
public void testLeftOuterJoin() throws Exception {
String[] input1 = {
"hello\t1",
"bye\t2",
"\t3"
};
String[] input2 = {
"hello\tworld",
"good\tmorning",
"\tevening"
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"('hello',1,'hello','world')",
"('bye',2,null,null)",
"(null,3,null,null)"
});
// with and without optional outer
for(int i = 0; i < 2; i++) {
//with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (n:chararray, a:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (n:chararray, m:chararray); ";
if(i == 0) {
script += "c = join a by $0 left outer, b by $0;" ;
} else {
script += "c = join a by $0 left, b by $0;" ;
}
script += "d = order c by $1;";
// ensure we parse correctly
Util.buildLp(pigServer, script);
// run query and test results only once
if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("d");
int counter= 0;
while(it.hasNext()) {
assertEquals(expectedResults.get(counter++), it.next());
}
assertEquals(expectedResults.size(), counter);
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
"b = load '"+ Util.encodeEscape(secondInput) +"'; ";
if(i == 0) {
script += "c = join a by $0 left outer, b by $0;" ;
} else {
script += "c = join a by $0 left, b by $0;" ;
}
try {
Util.registerMultiLineQuery(pigServer, script);
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
assertEquals(1105, pe.getErrorCode());
}
}
}
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
@Test
public void testRightOuterJoin() throws Exception {
String[] input1 = {
"hello\t1",
"bye\t2",
"\t3"
};
String[] input2 = {
"hello\tworld",
"good\tmorning",
"\tevening"
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"(null,null,null,'evening')",
"(null,null,'good','morning')",
"('hello',1,'hello','world')"
});
// with and without optional outer
for(int i = 0; i < 2; i++) {
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (n:chararray, a:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (n:chararray, m:chararray); ";
if(i == 0) {
script += "c = join a by $0 right outer, b by $0;" ;
} else {
script += "c = join a by $0 right, b by $0;" ;
}
script += "d = order c by $3;";
// ensure we parse correctly
Util.buildLp(pigServer, script);
// run query and test results only once
if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("d");
int counter= 0;
while(it.hasNext()) {
assertEquals(expectedResults.get(counter++), it.next());
}
assertEquals(expectedResults.size(), counter);
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
"b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
if(i == 0) {
script += "c = join a by $0 right outer, b by $0;" ;
} else {
script += "c = join a by $0 right, b by $0;" ;
}
try {
Util.registerMultiLineQuery(pigServer, script);
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
assertEquals(1105, pe.getErrorCode());
}
}
}
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
@Test
public void testFullOuterJoin() throws Exception {
String[] input1 = {
"hello\t1",
"bye\t2",
"\t3"
};
String[] input2 = {
"hello\tworld",
"good\tmorning",
"\tevening"
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"(null,null,null,'evening')" ,
"(null,null,'good','morning')" ,
"('hello',1,'hello','world')" ,
"('bye',2,null,null)" ,
"(null,3,null,null)"
});
// with and without optional outer
for(int i = 0; i < 2; i++) {
// with schema
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (n:chararray, a:int); " +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (n:chararray, m:chararray); ";
if(i == 0) {
script += "c = join a by $0 full outer, b by $0;" ;
} else {
script += "c = join a by $0 full, b by $0;" ;
}
script += "d = order c by $1, $3;";
// ensure we parse correctly
Util.buildLp(pigServer, script);
// run query and test results only once
if(i == 0) {
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("d");
int counter= 0;
while(it.hasNext()) {
assertEquals(expectedResults.get(counter++), it.next());
}
assertEquals(expectedResults.size(), counter);
// without schema
script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
"b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
if(i == 0) {
script += "c = join a by $0 full outer, b by $0;" ;
} else {
script += "c = join a by $0 full, b by $0;" ;
}
try {
Util.registerMultiLineQuery(pigServer, script);
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
assertEquals(1105, pe.getErrorCode());
}
}
}
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
@Test
public void testJoinTupleFieldKey() throws Exception{
String[] input1 = {
"(1,a)",
"(2,aa)"
};
String[] input2 = {
"(1,b)",
"(2,bb)"
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (a:tuple(a1:int, a2:chararray));" +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (b:tuple(b1:int, b2:chararray));" +
"c = join a by a.a1, b by b.b1;";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"((1,'a'),(1,'b'))",
"((2,'aa'),(2,'bb'))"
});
Util.checkQueryOutputsAfterSort(it, expectedResults);
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
@Test
public void testJoinNullTupleFieldKey() throws Exception{
String[] input1 = {
"1\t",
"2\taa"
};
String[] input2 = {
"1\t",
"2\taa"
};
String firstInput = createInputFile("a.txt", input1);
String secondInput = createInputFile("b.txt", input2);
String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (a1:int, a2:chararray);" +
"b = load '"+ Util.encodeEscape(secondInput) +"' as (b1:int, b2:chararray);" +
"c = join a by (a1, a2), b by (b1, b2);";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
List<Tuple> expectedResults = Util
.getTuplesFromConstantTupleStrings(new String[] { "(2,'aa',2,'aa')" });
Util.checkQueryOutputs(it, expectedResults);
deleteInputFile(firstInput);
deleteInputFile(secondInput);
}
}