blob: 5424269b24d55de74b5d9aadec4f89d179a3770b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hcatalog.MiniCluster;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
public class TestHCatEximStorer extends TestCase {
private static final String NONPART_TABLE = "junit_unparted";
private static final String PARTITIONED_TABLE = "junit_parted";
private static MiniCluster cluster = MiniCluster.buildCluster();
private static final String dataLocation = "/tmp/data";
private static String fqdataLocation;
private static final String exportLocation = "/tmp/export";
private static String fqexportLocation;
private static Properties props;
private void cleanup() throws IOException {
MiniCluster.deleteFile(cluster, dataLocation);
MiniCluster.deleteFile(cluster, exportLocation);
}
@Override
protected void setUp() throws Exception {
props = new Properties();
props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name"));
fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
System.out.println("FQ Data Location :" + fqdataLocation);
System.out.println("FQ Export Location :" + fqexportLocation);
cleanup();
}
@Override
protected void tearDown() throws Exception {
cleanup();
}
private void populateDataFile() throws IOException {
MiniCluster.deleteFile(cluster, dataLocation);
String[] input = new String[] {
"237,Krishna,01/01/1990,M,IN,TN",
"238,Kalpana,01/01/2000,F,IN,KA",
"239,Satya,01/01/2001,M,US,TN",
"240,Kavya,01/01/2002,F,US,KA"
};
MiniCluster.createInputFile(cluster, dataLocation, input);
}
public void testStoreNonPartTable() throws Exception {
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + NONPART_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
server.executeBatch();
FileSystem fs = cluster.getFileSystem();
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name"));
Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata"));
Table table = metadata.getKey();
List<Partition> partitions = metadata.getValue();
List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
Constants.STRING_TYPE_NAME, "")));
assertEquals("default", table.getDbName());
assertEquals(NONPART_TABLE, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(0, table.getPartitionKeys().size());
assertEquals(0, partitions.size());
}
public void testStorePartTable() throws Exception {
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');");
server.executeBatch();
FileSystem fs = cluster.getFileSystem();
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name"));
Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata"));
Table table = metadata.getKey();
List<Partition> partitions = metadata.getValue();
List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
Constants.STRING_TYPE_NAME, "")));
assertEquals("default", table.getDbName());
assertEquals(PARTITIONED_TABLE, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(2, table.getPartitionKeys().size());
List<FieldSchema> partSchema = table.getPartitionKeys();
assertEquals("emp_country", partSchema.get(0).getName());
assertEquals("emp_state", partSchema.get(1).getName());
assertEquals(1, partitions.size());
Partition partition = partitions.get(0);
assertEquals("in", partition.getValues().get(0));
assertEquals("tn", partition.getValues().get(1));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
}
public void testStorePartTable_state_country() throws Exception {
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_state=tn,emp_country=in');");
server.executeBatch();
FileSystem fs = cluster.getFileSystem();
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name"));
Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata"));
Table table = metadata.getKey();
List<Partition> partitions = metadata.getValue();
List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
Constants.STRING_TYPE_NAME, "")));
assertEquals("default", table.getDbName());
assertEquals(PARTITIONED_TABLE, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(2, table.getPartitionKeys().size());
List<FieldSchema> partSchema = table.getPartitionKeys();
assertEquals("emp_state", partSchema.get(0).getName());
assertEquals("emp_country", partSchema.get(1).getName());
assertEquals(1, partitions.size());
Partition partition = partitions.get(0);
assertEquals("tn", partition.getValues().get(0));
assertEquals("in", partition.getValues().get(1));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
}
public void testStoreNonPartCompatSchemaTable() throws Exception {
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + NONPART_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:chararray');");
server.executeBatch();
FileSystem fs = cluster.getFileSystem();
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name"));
Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata"));
Table table = metadata.getKey();
List<Partition> partitions = metadata.getValue();
List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("sex",
Constants.STRING_TYPE_NAME, "")));
assertEquals("default", table.getDbName());
assertEquals(NONPART_TABLE, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(0, table.getPartitionKeys().size());
assertEquals(0, partitions.size());
}
public void testStoreNonPartNonCompatSchemaTable() throws Exception {
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
server.registerQuery("store A into '" + NONPART_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:int');");
try {
server.executeBatch();
fail("Expected exception not thrown");
} catch (FrontendException e) {
}
}
public void testStoreMultiPartTable() throws Exception {
populateDataFile();
PigServer server = new PigServer(ExecType.LOCAL, props);
UDFContext.getUDFContext().setClientSystemProps();
server.setBatchOn();
server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');");
server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=ka');");
server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=tn');");
server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=ka');");
server.executeBatch();
FileSystem fs = cluster.getFileSystem();
System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name"));
Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata"));
Table table = metadata.getKey();
List<Partition> partitions = metadata.getValue();
List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
Constants.STRING_TYPE_NAME, "")));
assertEquals("default", table.getDbName());
assertEquals(PARTITIONED_TABLE, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(2, table.getPartitionKeys().size());
List<FieldSchema> partSchema = table.getPartitionKeys();
assertEquals("emp_country", partSchema.get(0).getName());
assertEquals("emp_state", partSchema.get(1).getName());
assertEquals(4, partitions.size());
Set<String> parts = new TreeSet<String>();
parts.add("in,tn");
parts.add("in,ka");
parts.add("us,tn");
parts.add("us,ka");
for (Partition partition : partitions) {
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertTrue(parts.remove(partition.getValues().get(0) + "," + partition.getValues().get(1)));
}
assertEquals(0, parts.size());
}
}