| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hcatalog.pig; |
| |
| import java.io.BufferedInputStream; |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.Iterator; |
| import java.util.Map; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.cli.CliSessionState; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars; |
| import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.MetaStoreUtils; |
| import org.apache.hadoop.hive.metastore.api.InvalidOperationException; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; |
| import org.apache.hadoop.hive.metastore.api.Partition; |
| import org.apache.hadoop.hive.metastore.api.UnknownTableException; |
| import org.apache.hadoop.hive.ql.CommandNeedRetryException; |
| import org.apache.hadoop.hive.ql.Driver; |
| import org.apache.hadoop.hive.ql.io.RCFileInputFormat; |
| import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.pig.HCatLoader; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.util.UDFContext; |
| import org.apache.thrift.TException; |
| |
| public class TestPigStorageDriver extends TestCase { |
| |
| private HiveConf hcatConf; |
| private Driver hcatDriver; |
| private HiveMetaStoreClient msc; |
| private static String tblLocation = "/tmp/test_pig/data"; |
| private static String anyExistingFileInCurDir = "ivy.xml"; |
| private static String warehouseDir = "/tmp/hcat_junit_warehouse"; |
| |
| @Override |
| protected void setUp() throws Exception { |
| |
| hcatConf = new HiveConf(this.getClass()); |
| hcatConf.set(ConfVars.PREEXECHOOKS.varname, ""); |
| hcatConf.set(ConfVars.POSTEXECHOOKS.varname, ""); |
| hcatConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); |
| hcatConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName()); |
| hcatDriver = new Driver(hcatConf); |
| msc = new HiveMetaStoreClient(hcatConf); |
| SessionState.start(new CliSessionState(hcatConf)); |
| super.setUp(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| super.tearDown(); |
| } |
| |
| public void testPigStorageDriver() throws IOException, CommandNeedRetryException{ |
| |
| String fsLoc = hcatConf.get("fs.default.name"); |
| Path tblPath = new Path(fsLoc, tblLocation); |
| String tblName = "junit_pigstorage"; |
| tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath); |
| |
| hcatDriver.run("drop table " + tblName); |
| CommandProcessorResponse resp; |
| String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE"; |
| |
| resp = hcatDriver.run(createTable); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| resp = hcatDriver.run("alter table " + tblName + " add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| resp = hcatDriver.run("alter table " + tblName + " partition (b='2010-10-10') set fileformat TEXTFILE"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| resp = hcatDriver.run("desc extended " + tblName + " partition (b='2010-10-10')"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties()); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.registerQuery(" a = load '" + tblName + "' using "+HCatLoader.class.getName()+";"); |
| Iterator<Tuple> itr = server.openIterator("a"); |
| boolean result = compareWithFile(itr, anyExistingFileInCurDir, 2, "2010-10-10", null); |
| assertTrue(result); |
| |
| server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);"); |
| server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')"); |
| |
| server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);"); |
| itr = server.openIterator("a"); |
| result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", null); |
| assertTrue(result); |
| |
| // Test multi-store |
| server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);"); |
| server.registerQuery("store a into '" + tblName + "' using " + HCatStorer.class.getName() + "('b=2010-11-01');"); |
| server.registerQuery("store a into '" + tblName + "' using " + HCatStorer.class.getName() + "('b=2010-11-02');"); |
| |
| server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-01' using PigStorage() as (a:chararray);"); |
| itr = server.openIterator("a"); |
| result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-01", null); |
| assertTrue(result); |
| |
| server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-02' using PigStorage() as (a:chararray);"); |
| itr = server.openIterator("a"); |
| result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-02", null); |
| assertTrue(result); |
| |
| hcatDriver.run("drop table " + tblName); |
| } |
| |
| private boolean compareWithFile(Iterator<Tuple> itr, String factFile, int numColumn, String key, String valueSuffix) throws IOException { |
| DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(factFile)))); |
| while(itr.hasNext()){ |
| Tuple t = itr.next(); |
| assertEquals(numColumn, t.size()); |
| if(t.get(0) != null) { |
| // If underlying data-field is empty. PigStorage inserts null instead |
| // of empty String objects. |
| assertTrue(t.get(0) instanceof String); |
| String expected = stream.readLine(); |
| if (valueSuffix!=null) |
| expected += valueSuffix; |
| assertEquals(expected, t.get(0)); |
| } |
| else{ |
| assertTrue(stream.readLine().isEmpty()); |
| } |
| |
| if (numColumn>1) { |
| // The second column must be key |
| assertTrue(t.get(1) instanceof String); |
| assertEquals(key, t.get(1)); |
| } |
| } |
| assertEquals(0,stream.available()); |
| stream.close(); |
| return true; |
| } |
| |
| public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{ |
| |
| hcatDriver.run("drop table junit_pigstorage_delim"); |
| |
| CommandProcessorResponse resp; |
| String createTable = "create table junit_pigstorage_delim (a0 string, a1 string) partitioned by (b string) stored as RCFILE"; |
| |
| resp = hcatDriver.run(createTable); |
| |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| resp = hcatDriver.run("alter table junit_pigstorage_delim add partition (b='2010-10-10')"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat TEXTFILE"); |
| |
| Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10"); |
| Map<String,String> partParms = part.getParameters(); |
| partParms.put(HCatConstants.HCAT_PIG_LOADER_ARGS, "control-A"); |
| partParms.put(HCatConstants.HCAT_PIG_STORER_ARGS, "control-A"); |
| |
| msc.alter_partition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", part); |
| |
| PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties()); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.registerQuery(" a = load 'junit_pigstorage_delim' using "+HCatLoader.class.getName()+";"); |
| try{ |
| server.openIterator("a"); |
| }catch(FrontendException fe){} |
| |
| resp = hcatDriver.run("alter table junit_pigstorage_delim set fileformat TEXTFILE"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| resp = hcatDriver.run("alter table junit_pigstorage_delim set TBLPROPERTIES ('hcat.pig.loader.args'=':', 'hcat.pig.storer.args'=':')"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| File inputFile = File.createTempFile("hcat_test", ""); |
| PrintWriter p = new PrintWriter(new FileWriter(inputFile)); |
| p.println("1\t2"); |
| p.println("3\t4"); |
| p.close(); |
| server.registerQuery("a = load '"+inputFile.toString()+"' using PigStorage() as (a0:chararray, a1:chararray);"); |
| server.store("a", "junit_pigstorage_delim", HCatStorer.class.getName() + "('b=2010-10-11')"); |
| |
| server.registerQuery("a = load '/tmp/hcat_junit_warehouse/junit_pigstorage_delim/b=2010-10-11' using PigStorage() as (a:chararray);"); |
| Iterator<Tuple> itr = server.openIterator("a"); |
| |
| assertTrue(itr.hasNext()); |
| Tuple t = itr.next(); |
| assertTrue(t.get(0).equals("1:2")); |
| |
| assertTrue(itr.hasNext()); |
| t = itr.next(); |
| assertTrue(t.get(0).equals("3:4")); |
| |
| assertFalse(itr.hasNext()); |
| inputFile.delete(); |
| } |
| |
| public void testMultiConstructArgs() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{ |
| |
| String fsLoc = hcatConf.get("fs.default.name"); |
| Path tblPath = new Path(fsLoc, tblLocation); |
| String tblName = "junit_pigstorage_constructs"; |
| tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath); |
| |
| hcatDriver.run("drop table junit_pigstorage_constructs"); |
| |
| CommandProcessorResponse resp; |
| String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE"; |
| |
| resp = hcatDriver.run(createTable); |
| |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| resp = hcatDriver.run("alter table " + tblName + " set TBLPROPERTIES ('hcat.pig.storer'='org.apache.hcatalog.pig.MyPigStorage', 'hcat.pig.storer.args'=':#hello', 'hcat.pig.args.delimiter'='#')"); |
| assertEquals(0, resp.getResponseCode()); |
| assertNull(resp.getErrorMessage()); |
| |
| PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties()); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| |
| server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);"); |
| server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')"); |
| |
| server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);"); |
| Iterator<Tuple> itr = server.openIterator("a"); |
| boolean result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", ":hello"); |
| assertTrue(result); |
| } |
| } |