blob: fdf3a98ae01eaf1e5ad006182ac0ea836db2026e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.pig;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.pig.HCatLoader;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.thrift.TException;
public class TestPigStorageDriver extends TestCase {
private HiveConf hcatConf;
private Driver hcatDriver;
private HiveMetaStoreClient msc;
private static String tblLocation = "/tmp/test_pig/data";
private static String anyExistingFileInCurDir = "ivy.xml";
private static String warehouseDir = "/tmp/hcat_junit_warehouse";
@Override
protected void setUp() throws Exception {
hcatConf = new HiveConf(this.getClass());
hcatConf.set(ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(ConfVars.POSTEXECHOOKS.varname, "");
hcatConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
hcatDriver = new Driver(hcatConf);
msc = new HiveMetaStoreClient(hcatConf);
SessionState.start(new CliSessionState(hcatConf));
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testPigStorageDriver() throws IOException, CommandNeedRetryException{
String fsLoc = hcatConf.get("fs.default.name");
Path tblPath = new Path(fsLoc, tblLocation);
String tblName = "junit_pigstorage";
tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
hcatDriver.run("drop table " + tblName);
CommandProcessorResponse resp;
String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE";
resp = hcatDriver.run(createTable);
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("alter table " + tblName + " add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("alter table " + tblName + " partition (b='2010-10-10') set fileformat TEXTFILE");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("desc extended " + tblName + " partition (b='2010-10-10')");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
UDFContext.getUDFContext().setClientSystemProps();
server.registerQuery(" a = load '" + tblName + "' using "+HCatLoader.class.getName()+";");
Iterator<Tuple> itr = server.openIterator("a");
boolean result = compareWithFile(itr, anyExistingFileInCurDir, 2, "2010-10-10", null);
assertTrue(result);
server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')");
server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);");
itr = server.openIterator("a");
result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", null);
assertTrue(result);
// Test multi-store
server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
server.registerQuery("store a into '" + tblName + "' using " + HCatStorer.class.getName() + "('b=2010-11-01');");
server.registerQuery("store a into '" + tblName + "' using " + HCatStorer.class.getName() + "('b=2010-11-02');");
server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-01' using PigStorage() as (a:chararray);");
itr = server.openIterator("a");
result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-01", null);
assertTrue(result);
server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-02' using PigStorage() as (a:chararray);");
itr = server.openIterator("a");
result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-02", null);
assertTrue(result);
hcatDriver.run("drop table " + tblName);
}
private boolean compareWithFile(Iterator<Tuple> itr, String factFile, int numColumn, String key, String valueSuffix) throws IOException {
DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(factFile))));
while(itr.hasNext()){
Tuple t = itr.next();
assertEquals(numColumn, t.size());
if(t.get(0) != null) {
// If underlying data-field is empty. PigStorage inserts null instead
// of empty String objects.
assertTrue(t.get(0) instanceof String);
String expected = stream.readLine();
if (valueSuffix!=null)
expected += valueSuffix;
assertEquals(expected, t.get(0));
}
else{
assertTrue(stream.readLine().isEmpty());
}
if (numColumn>1) {
// The second column must be key
assertTrue(t.get(1) instanceof String);
assertEquals(key, t.get(1));
}
}
assertEquals(0,stream.available());
stream.close();
return true;
}
public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{
hcatDriver.run("drop table junit_pigstorage_delim");
CommandProcessorResponse resp;
String createTable = "create table junit_pigstorage_delim (a0 string, a1 string) partitioned by (b string) stored as RCFILE";
resp = hcatDriver.run(createTable);
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("alter table junit_pigstorage_delim add partition (b='2010-10-10')");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat TEXTFILE");
Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10");
Map<String,String> partParms = part.getParameters();
partParms.put(HCatConstants.HCAT_PIG_LOADER_ARGS, "control-A");
partParms.put(HCatConstants.HCAT_PIG_STORER_ARGS, "control-A");
msc.alter_partition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", part);
PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
UDFContext.getUDFContext().setClientSystemProps();
server.registerQuery(" a = load 'junit_pigstorage_delim' using "+HCatLoader.class.getName()+";");
try{
server.openIterator("a");
}catch(FrontendException fe){}
resp = hcatDriver.run("alter table junit_pigstorage_delim set fileformat TEXTFILE");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("alter table junit_pigstorage_delim set TBLPROPERTIES ('hcat.pig.loader.args'=':', 'hcat.pig.storer.args'=':')");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
File inputFile = File.createTempFile("hcat_test", "");
PrintWriter p = new PrintWriter(new FileWriter(inputFile));
p.println("1\t2");
p.println("3\t4");
p.close();
server.registerQuery("a = load '"+inputFile.toString()+"' using PigStorage() as (a0:chararray, a1:chararray);");
server.store("a", "junit_pigstorage_delim", HCatStorer.class.getName() + "('b=2010-10-11')");
server.registerQuery("a = load '/tmp/hcat_junit_warehouse/junit_pigstorage_delim/b=2010-10-11' using PigStorage() as (a:chararray);");
Iterator<Tuple> itr = server.openIterator("a");
assertTrue(itr.hasNext());
Tuple t = itr.next();
assertTrue(t.get(0).equals("1:2"));
assertTrue(itr.hasNext());
t = itr.next();
assertTrue(t.get(0).equals("3:4"));
assertFalse(itr.hasNext());
inputFile.delete();
}
public void testMultiConstructArgs() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{
String fsLoc = hcatConf.get("fs.default.name");
Path tblPath = new Path(fsLoc, tblLocation);
String tblName = "junit_pigstorage_constructs";
tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
hcatDriver.run("drop table junit_pigstorage_constructs");
CommandProcessorResponse resp;
String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE";
resp = hcatDriver.run(createTable);
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
resp = hcatDriver.run("alter table " + tblName + " set TBLPROPERTIES ('hcat.pig.storer'='org.apache.hcatalog.pig.MyPigStorage', 'hcat.pig.storer.args'=':#hello', 'hcat.pig.args.delimiter'='#')");
assertEquals(0, resp.getResponseCode());
assertNull(resp.getErrorMessage());
PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
UDFContext.getUDFContext().setClientSystemProps();
server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')");
server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);");
Iterator<Tuple> itr = server.openIterator("a");
boolean result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", ":hello");
assertTrue(result);
}
}