blob: 082d72376b1ca345f66b0638c55f35468483e80d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import junit.framework.TestCase;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hcatalog.MiniCluster;
import org.apache.hcatalog.data.HCatDataCheckUtil;
import org.apache.hcatalog.mapred.HCatMapredInputFormat;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.impl.util.UDFContext;
public class TestHiveHCatInputFormat extends TestCase {
private static MiniCluster cluster = MiniCluster.buildCluster();
private static Driver driver;
String PTNED_TABLE = "junit_testhiveinputintegration_ptni";
String UNPTNED_TABLE = "junit_testhiveinputintegration_noptn";
String basicFile = "/tmp/"+PTNED_TABLE+".file";
public void testFromHive() throws Exception {
if (driver == null){
driver = HCatDataCheckUtil.instantiateDriver(cluster);
}
Properties props = new Properties();
props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
String basicFileFullName = cluster.getProperties().getProperty("fs.default.name") + basicFile;
cleanup();
// create source data file
HCatDataCheckUtil.generateDataFile(cluster,basicFile);
String createPtnedTable = "(j int, s string) partitioned by (i int) "
+"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+ "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
HCatDataCheckUtil.createTable(driver,PTNED_TABLE,createPtnedTable);
String createUnptnedTable = "(i int, j int, s string) "
+"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+ "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
HCatDataCheckUtil.createTable(driver,UNPTNED_TABLE,createUnptnedTable);
driver.run("describe extended "+UNPTNED_TABLE);
ArrayList<String> des_values = new ArrayList<String>();
driver.getResults(des_values);
for (String s : des_values){
System.err.println("du:"+s);
}
driver.run("describe extended "+PTNED_TABLE);
ArrayList<String> des2_values = new ArrayList<String>();
driver.getResults(des2_values);
for (String s : des2_values){
System.err.println("dp:"+s);
}
// use pig to read from source file and put into this table
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);");
server.registerQuery("store A into '"+UNPTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
server.executeBatch();
server.setBatchOn();
server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);");
server.registerQuery("store A into '"+PTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
server.executeBatch();
// partitioned by i
// select * from tbl;
// select j,s,i from tbl;
// select * from tbl where i = 3;
// select j,s,i from tbl where i = 3;
// select * from tbl where j = 3;
// select j,s,i from tbl where j = 3;
ArrayList<String> p_select_star_nofilter = HCatDataCheckUtil.formattedRun(driver,
"p_select_star_nofilter","select * from "+PTNED_TABLE);
ArrayList<String> p_select_named_nofilter = HCatDataCheckUtil.formattedRun(driver,
"p_select_named_nofilter","select j,s,i from "+PTNED_TABLE);
assertDataIdentical(p_select_star_nofilter,p_select_named_nofilter,50);
ArrayList<String> p_select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
"p_select_star_ptnfilter","select * from "+PTNED_TABLE+" where i = 3");
ArrayList<String> p_select_named_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
"p_select_named_ptnfilter","select j,s,i from "+PTNED_TABLE+" where i = 3");
assertDataIdentical(p_select_star_ptnfilter,p_select_named_ptnfilter,10);
ArrayList<String> select_star_nonptnfilter = HCatDataCheckUtil.formattedRun(driver,
"select_star_nonptnfilter","select * from "+PTNED_TABLE+" where j = 28");
ArrayList<String> select_named_nonptnfilter = HCatDataCheckUtil.formattedRun(driver,
"select_named_nonptnfilter","select j,s,i from "+PTNED_TABLE+" where j = 28");
assertDataIdentical(select_star_nonptnfilter,select_named_nonptnfilter,1);
// non-partitioned
// select * from tbl;
// select i,j,s from tbl;
// select * from tbl where i = 3;
// select i,j,s from tbl where i = 3;
// select j,s,i from tbl;
// select j,s,i from tbl where i = 3;
ArrayList<String> select_star_nofilter = HCatDataCheckUtil.formattedRun(driver,
"select_star_nofilter","select * from "+UNPTNED_TABLE); //i,j,s select * order is diff for unptn
ArrayList<String> select_ijs_nofilter = HCatDataCheckUtil.formattedRun(driver,
"select_ijs_nofilter","select i,j,s from "+UNPTNED_TABLE);
assertDataIdentical(select_star_nofilter,select_ijs_nofilter,50);
ArrayList<String> select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
"select_star_ptnfilter","select * from "+UNPTNED_TABLE+" where i = 3"); //i,j,s
ArrayList<String> select_ijs_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
"select_ijs_ptnfilter","select i,j,s from "+UNPTNED_TABLE+" where i = 3");
assertDataIdentical(select_star_ptnfilter,select_ijs_ptnfilter,10);
ArrayList<String> select_jsi_nofilter = HCatDataCheckUtil.formattedRun(driver,
"select_jsi_nofilter","select j,s,i from "+UNPTNED_TABLE);
assertDataIdentical(p_select_named_nofilter,select_jsi_nofilter,50,true);
ArrayList<String> select_jsi_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
"select_jsi_ptnfilter","select j,s,i from "+UNPTNED_TABLE+" where i = 3");
assertDataIdentical(p_select_named_ptnfilter,select_jsi_ptnfilter,10,true);
}
private void assertDataIdentical(ArrayList<String> result1,
ArrayList<String> result2, int numRecords) {
assertDataIdentical(result1,result2,numRecords,false);
}
private void assertDataIdentical(ArrayList<String> result1,
ArrayList<String> result2, int numRecords,boolean doSort) {
assertEquals(numRecords, result1.size());
assertEquals(numRecords, result2.size());
Collections.sort(result1);
Collections.sort(result2);
for (int i = 0; i < numRecords; i++){
assertEquals(result1.get(i),result2.get(i));
}
}
private void cleanup() throws IOException, CommandNeedRetryException {
MiniCluster.deleteFile(cluster, basicFile);
HCatDataCheckUtil.dropTable(driver,PTNED_TABLE);
HCatDataCheckUtil.dropTable(driver,UNPTNED_TABLE);
}
}