blob: 8b3e0898a9ae04b11521906fc24444a9d198fc2e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.mapreduce.TestHCatEximInputFormat.TestImport.EmpDetails;
/**
*
* TestHCatEximInputFormat. tests primarily HCatEximInputFormat but
* also HCatEximOutputFormat.
*
*/
public class TestHCatEximInputFormat extends TestCase {
public static class TestExport extends
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, HCatRecord> {
private HCatSchema recordSchema;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
recordSchema = HCatEximOutputFormat.getTableSchema(context);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
HCatRecord record = new DefaultHCatRecord(recordSchema.size());
record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
record.setString("emp_name", recordSchema, cols[1]);
record.setString("emp_dob", recordSchema, cols[2]);
record.setString("emp_sex", recordSchema, cols[3]);
context.write(key, record);
}
}
public static class TestImport extends
org.apache.hadoop.mapreduce.Mapper<
org.apache.hadoop.io.LongWritable, HCatRecord,
org.apache.hadoop.io.Text,
org.apache.hadoop.io.Text> {
private HCatSchema recordSchema;
public static class EmpDetails {
public String emp_name;
public String emp_dob;
public String emp_sex;
public String emp_country;
public String emp_state;
}
public static Map<Integer, EmpDetails> empRecords = new TreeMap<Integer, EmpDetails>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
try {
recordSchema = HCatBaseInputFormat.getOutputSchema(context);
} catch (Exception e) {
throw new IOException("Error getting outputschema from job configuration", e);
}
System.out.println("RecordSchema : " + recordSchema.toString());
}
@Override
public void map(LongWritable key, HCatRecord value, Context context)
throws IOException, InterruptedException {
EmpDetails empDetails = new EmpDetails();
Integer emp_id = value.getInteger("emp_id", recordSchema);
String emp_name = value.getString("emp_name", recordSchema);
empDetails.emp_name = emp_name;
if (recordSchema.getPosition("emp_dob") != null) {
empDetails.emp_dob = value.getString("emp_dob", recordSchema);
}
if (recordSchema.getPosition("emp_sex") != null) {
empDetails.emp_sex = value.getString("emp_sex", recordSchema);
}
if (recordSchema.getPosition("emp_country") != null) {
empDetails.emp_country = value.getString("emp_country", recordSchema);
}
if (recordSchema.getPosition("emp_state") != null) {
empDetails.emp_state = value.getString("emp_state", recordSchema);
}
empRecords.put(emp_id, empDetails);
}
}
private static final String dbName = "hcatEximOutputFormatTestDB";
private static final String tblName = "hcatEximOutputFormatTestTable";
Configuration conf;
Job job;
List<HCatFieldSchema> columns;
HCatSchema schema;
FileSystem fs;
Path inputLocation;
Path outputLocation;
private HCatSchema partSchema;
@Override
protected void setUp() throws Exception {
System.out.println("Setup started");
super.setUp();
conf = new Configuration();
job = new Job(conf, "test eximinputformat");
columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
Constants.STRING_TYPE_NAME, "")));
schema = new HCatSchema(columns);
fs = new LocalFileSystem();
fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
inputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
outputLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
job.setJarByClass(this.getClass());
job.setNumReduceTasks(0);
System.out.println("Setup done");
}
private void setupMRExport(String[] records) throws IOException {
if (fs.exists(outputLocation)) {
fs.delete(outputLocation, true);
}
FSDataOutputStream ds = fs.create(outputLocation, true);
for (String record : records) {
ds.writeBytes(record);
}
ds.close();
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HCatEximOutputFormat.class);
TextInputFormat.setInputPaths(job, outputLocation);
job.setMapperClass(TestExport.class);
}
private void setupMRImport() throws IOException {
if (fs.exists(outputLocation)) {
fs.delete(outputLocation, true);
}
job.setInputFormatClass(HCatEximInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputLocation);
job.setMapperClass(TestImport.class);
TestImport.empRecords.clear();
}
@Override
protected void tearDown() throws Exception {
System.out.println("Teardown started");
super.tearDown();
// fs.delete(inputLocation, true);
// fs.delete(outputLocation, true);
System.out.println("Teardown done");
}
private void runNonPartExport() throws IOException, InterruptedException, ClassNotFoundException {
if (fs.exists(inputLocation)) {
fs.delete(inputLocation, true);
}
setupMRExport(new String[] {
"237,Krishna,01/01/1990,M,IN,TN\n",
"238,Kalpana,01/01/2000,F,IN,KA\n",
"239,Satya,01/01/2001,M,US,TN\n",
"240,Kavya,01/01/2002,F,US,KA\n"
});
HCatEximOutputFormat.setOutput(
job,
dbName,
tblName,
inputLocation.toString(),
null,
null,
schema);
job.waitForCompletion(true);
HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
}
private void runPartExport(String record, String country, String state) throws IOException, InterruptedException, ClassNotFoundException {
setupMRExport(new String[] {record});
List<String> partValues = new ArrayList<String>(2);
partValues.add(country);
partValues.add(state);
HCatEximOutputFormat.setOutput(
job,
dbName,
tblName,
inputLocation.toString(),
partSchema ,
partValues ,
schema);
job.waitForCompletion(true);
HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
}
public void testNonPart() throws Exception {
try {
runNonPartExport();
setUp();
setupMRImport();
HCatEximInputFormat.setInput(job, "tmp/exports", null);
job.waitForCompletion(true);
assertEquals(4, TestImport.empRecords.size());
assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", null, null);
assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", null, null);
assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", null, null);
assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", null, null);
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
public void testNonPartProjection() throws Exception {
try {
runNonPartExport();
setUp();
setupMRImport();
HCatEximInputFormat.setInput(job, "tmp/exports", null);
List<HCatFieldSchema> readColumns = new ArrayList<HCatFieldSchema>();
readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
HCatEximInputFormat.setOutputSchema(job, new HCatSchema(readColumns));
job.waitForCompletion(true);
assertEquals(4, TestImport.empRecords.size());
assertEmpDetail(TestImport.empRecords.get(237), "Krishna", null, null, null, null);
assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", null, null, null, null);
assertEmpDetail(TestImport.empRecords.get(239), "Satya", null, null, null, null);
assertEmpDetail(TestImport.empRecords.get(240), "Kavya", null, null, null, null);
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
public void testPart() throws Exception {
try {
if (fs.exists(inputLocation)) {
fs.delete(inputLocation, true);
}
List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
partSchema = new HCatSchema(partKeys);
runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
setUp();
runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
setUp();
runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
setUp();
runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
setUp();
setupMRImport();
HCatEximInputFormat.setInput(job, "tmp/exports", null);
job.waitForCompletion(true);
assertEquals(4, TestImport.empRecords.size());
assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
public void testPartWithPartCols() throws Exception {
try {
if (fs.exists(inputLocation)) {
fs.delete(inputLocation, true);
}
List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
partSchema = new HCatSchema(partKeys);
runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
setUp();
runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
setUp();
runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
setUp();
runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
setUp();
setupMRImport();
HCatEximInputFormat.setInput(job, "tmp/exports", null);
List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
colsPlusPartKeys.addAll(columns);
colsPlusPartKeys.addAll(partKeys);
HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(colsPlusPartKeys));
job.waitForCompletion(true);
assertEquals(4, TestImport.empRecords.size());
assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
public void testPartSelection() throws Exception {
try {
if (fs.exists(inputLocation)) {
fs.delete(inputLocation, true);
}
List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
partSchema = new HCatSchema(partKeys);
runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
setUp();
runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
setUp();
runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
setUp();
runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
setUp();
setupMRImport();
Map<String, String> filter = new TreeMap<String, String>();
filter.put("emp_state", "ka");
HCatEximInputFormat.setInput(job, "tmp/exports", filter);
job.waitForCompletion(true);
assertEquals(2, TestImport.empRecords.size());
assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
private void assertEmpDetail(EmpDetails empDetails, String name, String dob, String mf, String country, String state) {
assertNotNull(empDetails);
assertEquals(name, empDetails.emp_name);
assertEquals(dob, empDetails.emp_dob);
assertEquals(mf, empDetails.emp_sex);
assertEquals(country, empDetails.emp_country);
assertEquals(state, empDetails.emp_state);
}
}