blob: 238edb2ae4df61f6891c7f1b93e9a675c232bdbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.pig;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import junit.framework.TestCase;
import org.apache.hcatalog.MiniCluster;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
/**
*
* TestHCatEximLoader. Assumes Exim storer is working well
*
*/
public class TestHCatEximLoader extends TestCase {
private static final String NONPART_TABLE = "junit_unparted";
private static final String PARTITIONED_TABLE = "junit_parted";
private static MiniCluster cluster = MiniCluster.buildCluster();
private static final String dataLocation = "/tmp/data";
private static String fqdataLocation;
private static final String exportLocation = "/tmp/export";
private static String fqexportLocation;
private static Properties props;
private void cleanup() throws IOException {
MiniCluster.deleteFile(cluster, dataLocation);
MiniCluster.deleteFile(cluster, exportLocation);
}
@Override
protected void setUp() throws Exception {
props = new Properties();
props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ ", fs.default.name : " + props.getProperty("fs.default.name"));
fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
System.out.println("FQ Data Location :" + fqdataLocation);
System.out.println("FQ Export Location :" + fqexportLocation);
cleanup();
}
@Override
protected void tearDown() throws Exception {
cleanup();
}
private void populateDataFile() throws IOException {
MiniCluster.deleteFile(cluster, dataLocation);
String[] input = new String[] {
"237,Krishna,01/01/1990,M,IN,TN",
"238,Kalpana,01/01/2000,F,IN,KA",
"239,Satya,01/01/2001,M,US,TN",
"240,Kavya,01/01/2002,F,US,KA"
};
MiniCluster.createInputFile(cluster, dataLocation, input);
}
private static class EmpDetail {
String name;
String dob;
String mf;
String country;
String state;
}
private void assertEmpDetail(Tuple t, Map<Integer, EmpDetail> eds) throws ExecException {
assertNotNull(t);
assertEquals(6, t.size());
assertTrue(t.get(0).getClass() == Integer.class);
assertTrue(t.get(1).getClass() == String.class);
assertTrue(t.get(2).getClass() == String.class);
assertTrue(t.get(3).getClass() == String.class);
assertTrue(t.get(4).getClass() == String.class);
assertTrue(t.get(5).getClass() == String.class);
EmpDetail ed = eds.remove(t.get(0));
assertNotNull(ed);
assertEquals(ed.name, t.get(1));
assertEquals(ed.dob, t.get(2));
assertEquals(ed.mf, t.get(3));
assertEquals(ed.country, t.get(4));
assertEquals(ed.state, t.get(5));
}
private void addEmpDetail(Map<Integer, EmpDetail> empDetails, int id, String name,
String dob, String mf, String country, String state) {
EmpDetail ed = new EmpDetail();
ed.name = name;
ed.dob = dob;
ed.mf = mf;
ed.country = country;
ed.state = state;
empDetails.put(id, ed);
}
private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf)
throws ExecException {
assertNotNull(t);
assertEquals(4, t.size());
assertTrue(t.get(0).getClass() == Integer.class);
assertTrue(t.get(1).getClass() == String.class);
assertTrue(t.get(2).getClass() == String.class);
assertTrue(t.get(3).getClass() == String.class);
assertEquals(id, t.get(0));
assertEquals(name, t.get(1));
assertEquals(dob, t.get(2));
assertEquals(mf, t.get(3));
}
private void assertEmpDetail(Tuple t, String mf, String name)
throws ExecException {
assertNotNull(t);
assertEquals(2, t.size());
assertTrue(t.get(0).getClass() == String.class);
assertTrue(t.get(1).getClass() == String.class);
assertEquals(mf, t.get(0));
assertEquals(name, t.get(1));
}
public void testLoadNonPartTable() throws Exception {
populateDataFile();
{
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server
.registerQuery("A = load '"
+ fqdataLocation
+ "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + NONPART_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
server.executeBatch();
}
{
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server
.registerQuery("A = load '"
+ fqexportLocation
+ "' using org.apache.hcatalog.pig.HCatEximLoader();");
Iterator<Tuple> XIter = server.openIterator("A");
assertTrue(XIter.hasNext());
Tuple t = XIter.next();
assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M");
assertTrue(XIter.hasNext());
t = XIter.next();
assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F");
assertTrue(XIter.hasNext());
t = XIter.next();
assertEmpDetail(t, 239, "Satya", "01/01/2001", "M");
assertTrue(XIter.hasNext());
t = XIter.next();
assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F");
assertFalse(XIter.hasNext());
}
}
public void testLoadNonPartProjection() throws Exception {
populateDataFile();
{
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server
.registerQuery("A = load '"
+ fqdataLocation
+ "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + NONPART_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
server.executeBatch();
}
{
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server
.registerQuery("A = load '"
+ fqexportLocation
+ "' using org.apache.hcatalog.pig.HCatEximLoader();");
server.registerQuery("B = foreach A generate emp_sex, emp_name;");
Iterator<Tuple> XIter = server.openIterator("B");
assertTrue(XIter.hasNext());
Tuple t = XIter.next();
assertEmpDetail(t, "M", "Krishna");
assertTrue(XIter.hasNext());
t = XIter.next();
assertEmpDetail(t, "F", "Kalpana");
assertTrue(XIter.hasNext());
t = XIter.next();
assertEmpDetail(t, "M", "Satya");
assertTrue(XIter.hasNext());
t = XIter.next();
assertEmpDetail(t, "F", "Kavya");
assertFalse(XIter.hasNext());
}
}
public void testLoadMultiPartTable() throws Exception {
{
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server
.registerQuery("A = load '"
+ fqdataLocation +
"' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
);
server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=in,emp_state=tn');");
server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=in,emp_state=ka');");
server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=us,emp_state=tn');");
server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=us,emp_state=ka');");
server.executeBatch();
}
{
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server
.registerQuery("A = load '"
+ fqexportLocation
+ "' using org.apache.hcatalog.pig.HCatEximLoader() "
//+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
+ ";");
Iterator<Tuple> XIter = server.openIterator("A");
Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn");
addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn");
addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
while(XIter.hasNext()) {
Tuple t = XIter.next();
assertNotSame(0, empDetails.size());
assertEmpDetail(t, empDetails);
}
assertEquals(0, empDetails.size());
}
}
public void testLoadMultiPartFilter() throws Exception {
{
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server
.registerQuery("A = load '"
+ fqdataLocation +
"' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
);
server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=in,emp_state=tn');");
server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=in,emp_state=ka');");
server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=us,emp_state=tn');");
server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
"', 'emp_country=us,emp_state=ka');");
server.executeBatch();
}
{
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server
.registerQuery("A = load '"
+ fqexportLocation
+ "' using org.apache.hcatalog.pig.HCatEximLoader() "
+ ";");
server.registerQuery("B = filter A by emp_state == 'ka';");
Iterator<Tuple> XIter = server.openIterator("B");
Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
while(XIter.hasNext()) {
Tuple t = XIter.next();
assertNotSame(0, empDetails.size());
assertEmpDetail(t, empDetails);
}
assertEquals(0, empDetails.size());
}
}
}