| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.hcatalog.pig; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.TreeMap; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.hcatalog.MiniCluster; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.util.UDFContext; |
| |
| /** |
| * |
| * TestHCatEximLoader. Assumes Exim storer is working well |
| * |
| */ |
| public class TestHCatEximLoader extends TestCase { |
| |
| private static final String NONPART_TABLE = "junit_unparted"; |
| private static final String PARTITIONED_TABLE = "junit_parted"; |
| private static MiniCluster cluster = MiniCluster.buildCluster(); |
| |
| private static final String dataLocation = "/tmp/data"; |
| private static String fqdataLocation; |
| private static final String exportLocation = "/tmp/export"; |
| private static String fqexportLocation; |
| |
| private static Properties props; |
| |
| private void cleanup() throws IOException { |
| MiniCluster.deleteFile(cluster, dataLocation); |
| MiniCluster.deleteFile(cluster, exportLocation); |
| } |
| |
| @Override |
| protected void setUp() throws Exception { |
| props = new Properties(); |
| props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() |
| + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation; |
| fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation; |
| System.out.println("FQ Data Location :" + fqdataLocation); |
| System.out.println("FQ Export Location :" + fqexportLocation); |
| cleanup(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| cleanup(); |
| } |
| |
| private void populateDataFile() throws IOException { |
| MiniCluster.deleteFile(cluster, dataLocation); |
| String[] input = new String[] { |
| "237,Krishna,01/01/1990,M,IN,TN", |
| "238,Kalpana,01/01/2000,F,IN,KA", |
| "239,Satya,01/01/2001,M,US,TN", |
| "240,Kavya,01/01/2002,F,US,KA" |
| }; |
| MiniCluster.createInputFile(cluster, dataLocation, input); |
| } |
| |
| private static class EmpDetail { |
| String name; |
| String dob; |
| String mf; |
| String country; |
| String state; |
| } |
| |
| private void assertEmpDetail(Tuple t, Map<Integer, EmpDetail> eds) throws ExecException { |
| assertNotNull(t); |
| assertEquals(6, t.size()); |
| |
| assertTrue(t.get(0).getClass() == Integer.class); |
| assertTrue(t.get(1).getClass() == String.class); |
| assertTrue(t.get(2).getClass() == String.class); |
| assertTrue(t.get(3).getClass() == String.class); |
| assertTrue(t.get(4).getClass() == String.class); |
| assertTrue(t.get(5).getClass() == String.class); |
| |
| EmpDetail ed = eds.remove(t.get(0)); |
| assertNotNull(ed); |
| |
| assertEquals(ed.name, t.get(1)); |
| assertEquals(ed.dob, t.get(2)); |
| assertEquals(ed.mf, t.get(3)); |
| assertEquals(ed.country, t.get(4)); |
| assertEquals(ed.state, t.get(5)); |
| } |
| |
| private void addEmpDetail(Map<Integer, EmpDetail> empDetails, int id, String name, |
| String dob, String mf, String country, String state) { |
| EmpDetail ed = new EmpDetail(); |
| ed.name = name; |
| ed.dob = dob; |
| ed.mf = mf; |
| ed.country = country; |
| ed.state = state; |
| empDetails.put(id, ed); |
| } |
| |
| |
| |
| private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf) |
| throws ExecException { |
| assertNotNull(t); |
| assertEquals(4, t.size()); |
| assertTrue(t.get(0).getClass() == Integer.class); |
| assertTrue(t.get(1).getClass() == String.class); |
| assertTrue(t.get(2).getClass() == String.class); |
| assertTrue(t.get(3).getClass() == String.class); |
| |
| assertEquals(id, t.get(0)); |
| assertEquals(name, t.get(1)); |
| assertEquals(dob, t.get(2)); |
| assertEquals(mf, t.get(3)); |
| } |
| |
| private void assertEmpDetail(Tuple t, String mf, String name) |
| throws ExecException { |
| assertNotNull(t); |
| assertEquals(2, t.size()); |
| assertTrue(t.get(0).getClass() == String.class); |
| assertTrue(t.get(1).getClass() == String.class); |
| |
| assertEquals(mf, t.get(0)); |
| assertEquals(name, t.get(1)); |
| } |
| |
| |
| |
| public void testLoadNonPartTable() throws Exception { |
| populateDataFile(); |
| { |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server |
| .registerQuery("A = load '" |
| + fqdataLocation |
| + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + NONPART_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); |
| server.executeBatch(); |
| } |
| { |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| |
| server |
| .registerQuery("A = load '" |
| + fqexportLocation |
| + "' using org.apache.hcatalog.pig.HCatEximLoader();"); |
| Iterator<Tuple> XIter = server.openIterator("A"); |
| assertTrue(XIter.hasNext()); |
| Tuple t = XIter.next(); |
| assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M"); |
| assertTrue(XIter.hasNext()); |
| t = XIter.next(); |
| assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F"); |
| assertTrue(XIter.hasNext()); |
| t = XIter.next(); |
| assertEmpDetail(t, 239, "Satya", "01/01/2001", "M"); |
| assertTrue(XIter.hasNext()); |
| t = XIter.next(); |
| assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F"); |
| assertFalse(XIter.hasNext()); |
| } |
| } |
| |
| public void testLoadNonPartProjection() throws Exception { |
| populateDataFile(); |
| { |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server |
| .registerQuery("A = load '" |
| + fqdataLocation |
| + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + NONPART_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); |
| server.executeBatch(); |
| } |
| { |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| |
| server |
| .registerQuery("A = load '" |
| + fqexportLocation |
| + "' using org.apache.hcatalog.pig.HCatEximLoader();"); |
| server.registerQuery("B = foreach A generate emp_sex, emp_name;"); |
| |
| Iterator<Tuple> XIter = server.openIterator("B"); |
| assertTrue(XIter.hasNext()); |
| Tuple t = XIter.next(); |
| assertEmpDetail(t, "M", "Krishna"); |
| assertTrue(XIter.hasNext()); |
| t = XIter.next(); |
| assertEmpDetail(t, "F", "Kalpana"); |
| assertTrue(XIter.hasNext()); |
| t = XIter.next(); |
| assertEmpDetail(t, "M", "Satya"); |
| assertTrue(XIter.hasNext()); |
| t = XIter.next(); |
| assertEmpDetail(t, "F", "Kavya"); |
| assertFalse(XIter.hasNext()); |
| } |
| } |
| |
| |
| public void testLoadMultiPartTable() throws Exception { |
| { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server |
| .registerQuery("A = load '" |
| + fqdataLocation + |
| "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);" |
| ); |
| server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); |
| server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); |
| server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); |
| server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); |
| server.registerQuery("store INTN into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=in,emp_state=tn');"); |
| server.registerQuery("store INKA into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=in,emp_state=ka');"); |
| server.registerQuery("store USTN into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=us,emp_state=tn');"); |
| server.registerQuery("store USKA into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=us,emp_state=ka');"); |
| server.executeBatch(); |
| } |
| { |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| |
| server |
| .registerQuery("A = load '" |
| + fqexportLocation |
| + "' using org.apache.hcatalog.pig.HCatEximLoader() " |
| //+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"); |
| + ";"); |
| |
| Iterator<Tuple> XIter = server.openIterator("A"); |
| |
| Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>(); |
| addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn"); |
| addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka"); |
| addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn"); |
| addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka"); |
| |
| while(XIter.hasNext()) { |
| Tuple t = XIter.next(); |
| assertNotSame(0, empDetails.size()); |
| assertEmpDetail(t, empDetails); |
| } |
| assertEquals(0, empDetails.size()); |
| } |
| } |
| |
| public void testLoadMultiPartFilter() throws Exception { |
| { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server |
| .registerQuery("A = load '" |
| + fqdataLocation + |
| "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);" |
| ); |
| server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); |
| server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); |
| server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); |
| server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); |
| server.registerQuery("store INTN into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=in,emp_state=tn');"); |
| server.registerQuery("store INKA into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=in,emp_state=ka');"); |
| server.registerQuery("store USTN into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=us,emp_state=tn');"); |
| server.registerQuery("store USKA into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + |
| "', 'emp_country=us,emp_state=ka');"); |
| server.executeBatch(); |
| } |
| { |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| |
| server |
| .registerQuery("A = load '" |
| + fqexportLocation |
| + "' using org.apache.hcatalog.pig.HCatEximLoader() " |
| + ";"); |
| server.registerQuery("B = filter A by emp_state == 'ka';"); |
| |
| Iterator<Tuple> XIter = server.openIterator("B"); |
| |
| Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>(); |
| addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka"); |
| addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka"); |
| |
| while(XIter.hasNext()) { |
| Tuple t = XIter.next(); |
| assertNotSame(0, empDetails.size()); |
| assertEmpDetail(t, empDetails); |
| } |
| assertEquals(0, empDetails.size()); |
| } |
| } |
| |
| |
| } |