| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.hcatalog.pig; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hcatalog.MiniCluster; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.common.HCatUtil; |
| import org.apache.hcatalog.data.schema.HCatFieldSchema; |
| import org.apache.hcatalog.data.schema.HCatSchemaUtils; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.Partition; |
| import org.apache.hadoop.hive.metastore.api.Table; |
| import org.apache.hadoop.hive.ql.parse.EximUtil; |
| import org.apache.hadoop.hive.serde.Constants; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.util.UDFContext; |
| |
| public class TestHCatEximStorer extends TestCase { |
| |
| private static final String NONPART_TABLE = "junit_unparted"; |
| private static final String PARTITIONED_TABLE = "junit_parted"; |
| private static MiniCluster cluster = MiniCluster.buildCluster(); |
| |
| private static final String dataLocation = "/tmp/data"; |
| private static String fqdataLocation; |
| private static final String exportLocation = "/tmp/export"; |
| private static String fqexportLocation; |
| |
| private static Properties props; |
| |
| private void cleanup() throws IOException { |
| MiniCluster.deleteFile(cluster, dataLocation); |
| MiniCluster.deleteFile(cluster, exportLocation); |
| } |
| |
| @Override |
| protected void setUp() throws Exception { |
| props = new Properties(); |
| props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation; |
| fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation; |
| System.out.println("FQ Data Location :" + fqdataLocation); |
| System.out.println("FQ Export Location :" + fqexportLocation); |
| cleanup(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| cleanup(); |
| } |
| |
| private void populateDataFile() throws IOException { |
| MiniCluster.deleteFile(cluster, dataLocation); |
| String[] input = new String[] { |
| "237,Krishna,01/01/1990,M,IN,TN", |
| "238,Kalpana,01/01/2000,F,IN,KA", |
| "239,Satya,01/01/2001,M,US,TN", |
| "240,Kavya,01/01/2002,F,US,KA" |
| }; |
| MiniCluster.createInputFile(cluster, dataLocation, input); |
| } |
| |
| public void testStoreNonPartTable() throws Exception { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + NONPART_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); |
| server.executeBatch(); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| |
| Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); |
| Table table = metadata.getKey(); |
| List<Partition> partitions = metadata.getValue(); |
| |
| List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| |
| |
| assertEquals("default", table.getDbName()); |
| assertEquals(NONPART_TABLE, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(0, table.getPartitionKeys().size()); |
| |
| assertEquals(0, partitions.size()); |
| } |
| |
| public void testStorePartTable() throws Exception { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');"); |
| server.executeBatch(); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| |
| Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); |
| Table table = metadata.getKey(); |
| List<Partition> partitions = metadata.getValue(); |
| |
| List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| |
| |
| assertEquals("default", table.getDbName()); |
| assertEquals(PARTITIONED_TABLE, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(2, table.getPartitionKeys().size()); |
| List<FieldSchema> partSchema = table.getPartitionKeys(); |
| assertEquals("emp_country", partSchema.get(0).getName()); |
| assertEquals("emp_state", partSchema.get(1).getName()); |
| |
| assertEquals(1, partitions.size()); |
| Partition partition = partitions.get(0); |
| assertEquals("in", partition.getValues().get(0)); |
| assertEquals("tn", partition.getValues().get(1)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| } |
| |
| public void testStorePartTable_state_country() throws Exception { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_state=tn,emp_country=in');"); |
| server.executeBatch(); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| |
| Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); |
| Table table = metadata.getKey(); |
| List<Partition> partitions = metadata.getValue(); |
| |
| List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| |
| |
| assertEquals("default", table.getDbName()); |
| assertEquals(PARTITIONED_TABLE, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(2, table.getPartitionKeys().size()); |
| List<FieldSchema> partSchema = table.getPartitionKeys(); |
| assertEquals("emp_state", partSchema.get(0).getName()); |
| assertEquals("emp_country", partSchema.get(1).getName()); |
| |
| assertEquals(1, partitions.size()); |
| Partition partition = partitions.get(0); |
| assertEquals("tn", partition.getValues().get(0)); |
| assertEquals("in", partition.getValues().get(1)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| } |
| |
| public void testStoreNonPartCompatSchemaTable() throws Exception { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + NONPART_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:chararray');"); |
| server.executeBatch(); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| |
| Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); |
| Table table = metadata.getKey(); |
| List<Partition> partitions = metadata.getValue(); |
| |
| List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| |
| |
| assertEquals("default", table.getDbName()); |
| assertEquals(NONPART_TABLE, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(0, table.getPartitionKeys().size()); |
| |
| assertEquals(0, partitions.size()); |
| } |
| |
| public void testStoreNonPartNonCompatSchemaTable() throws Exception { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); |
| server.registerQuery("store A into '" + NONPART_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:int');"); |
| try { |
| server.executeBatch(); |
| fail("Expected exception not thrown"); |
| } catch (FrontendException e) { |
| } |
| } |
| |
| public void testStoreMultiPartTable() throws Exception { |
| populateDataFile(); |
| PigServer server = new PigServer(ExecType.LOCAL, props); |
| UDFContext.getUDFContext().setClientSystemProps(); |
| server.setBatchOn(); |
| server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"); |
| server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); |
| server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); |
| server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); |
| server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); |
| server.registerQuery("store INTN into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');"); |
| server.registerQuery("store INKA into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=ka');"); |
| server.registerQuery("store USTN into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=tn');"); |
| server.registerQuery("store USKA into '" + PARTITIONED_TABLE |
| + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=ka');"); |
| server.executeBatch(); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| |
| System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); |
| |
| Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); |
| Table table = metadata.getKey(); |
| List<Partition> partitions = metadata.getValue(); |
| |
| List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| |
| |
| assertEquals("default", table.getDbName()); |
| assertEquals(PARTITIONED_TABLE, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(2, table.getPartitionKeys().size()); |
| List<FieldSchema> partSchema = table.getPartitionKeys(); |
| assertEquals("emp_country", partSchema.get(0).getName()); |
| assertEquals("emp_state", partSchema.get(1).getName()); |
| |
| assertEquals(4, partitions.size()); |
| Set<String> parts = new TreeSet<String>(); |
| parts.add("in,tn"); |
| parts.add("in,ka"); |
| parts.add("us,tn"); |
| parts.add("us,ka"); |
| |
| for (Partition partition : partitions) { |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertTrue(parts.remove(partition.getValues().get(0) + "," + partition.getValues().get(1))); |
| } |
| assertEquals(0, parts.size()); |
| } |
| } |