| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hcatalog.mapreduce; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.serde.Constants; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| import org.apache.hcatalog.data.DefaultHCatRecord; |
| import org.apache.hcatalog.data.HCatRecord; |
| import org.apache.hcatalog.data.schema.HCatFieldSchema; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| import org.apache.hcatalog.data.schema.HCatSchemaUtils; |
| import org.apache.hcatalog.mapreduce.TestHCatEximInputFormat.TestImport.EmpDetails; |
| |
| /** |
| * |
| * TestHCatEximInputFormat. tests primarily HCatEximInputFormat but |
| * also HCatEximOutputFormat. |
| * |
| */ |
| public class TestHCatEximInputFormat extends TestCase { |
| |
| public static class TestExport extends |
| org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, HCatRecord> { |
| |
| private HCatSchema recordSchema; |
| |
| @Override |
| protected void setup(Context context) throws IOException, |
| InterruptedException { |
| super.setup(context); |
| recordSchema = HCatEximOutputFormat.getTableSchema(context); |
| } |
| |
| @Override |
| public void map(LongWritable key, Text value, Context context) |
| throws IOException, InterruptedException { |
| String[] cols = value.toString().split(","); |
| HCatRecord record = new DefaultHCatRecord(recordSchema.size()); |
| record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0])); |
| record.setString("emp_name", recordSchema, cols[1]); |
| record.setString("emp_dob", recordSchema, cols[2]); |
| record.setString("emp_sex", recordSchema, cols[3]); |
| context.write(key, record); |
| } |
| } |
| |
| public static class TestImport extends |
| org.apache.hadoop.mapreduce.Mapper< |
| org.apache.hadoop.io.LongWritable, HCatRecord, |
| org.apache.hadoop.io.Text, |
| org.apache.hadoop.io.Text> { |
| |
| private HCatSchema recordSchema; |
| |
| public static class EmpDetails { |
| public String emp_name; |
| public String emp_dob; |
| public String emp_sex; |
| public String emp_country; |
| public String emp_state; |
| } |
| |
| public static Map<Integer, EmpDetails> empRecords = new TreeMap<Integer, EmpDetails>(); |
| |
| @Override |
| protected void setup(Context context) throws IOException, |
| InterruptedException { |
| super.setup(context); |
| try { |
| recordSchema = HCatBaseInputFormat.getOutputSchema(context); |
| } catch (Exception e) { |
| throw new IOException("Error getting outputschema from job configuration", e); |
| } |
| System.out.println("RecordSchema : " + recordSchema.toString()); |
| } |
| |
| @Override |
| public void map(LongWritable key, HCatRecord value, Context context) |
| throws IOException, InterruptedException { |
| EmpDetails empDetails = new EmpDetails(); |
| Integer emp_id = value.getInteger("emp_id", recordSchema); |
| String emp_name = value.getString("emp_name", recordSchema); |
| empDetails.emp_name = emp_name; |
| if (recordSchema.getPosition("emp_dob") != null) { |
| empDetails.emp_dob = value.getString("emp_dob", recordSchema); |
| } |
| if (recordSchema.getPosition("emp_sex") != null) { |
| empDetails.emp_sex = value.getString("emp_sex", recordSchema); |
| } |
| if (recordSchema.getPosition("emp_country") != null) { |
| empDetails.emp_country = value.getString("emp_country", recordSchema); |
| } |
| if (recordSchema.getPosition("emp_state") != null) { |
| empDetails.emp_state = value.getString("emp_state", recordSchema); |
| } |
| empRecords.put(emp_id, empDetails); |
| } |
| } |
| |
| private static final String dbName = "hcatEximOutputFormatTestDB"; |
| private static final String tblName = "hcatEximOutputFormatTestTable"; |
| Configuration conf; |
| Job job; |
| List<HCatFieldSchema> columns; |
| HCatSchema schema; |
| FileSystem fs; |
| Path inputLocation; |
| Path outputLocation; |
| private HCatSchema partSchema; |
| |
| |
| @Override |
| protected void setUp() throws Exception { |
| System.out.println("Setup started"); |
| super.setUp(); |
| conf = new Configuration(); |
| job = new Job(conf, "test eximinputformat"); |
| columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| schema = new HCatSchema(columns); |
| |
| fs = new LocalFileSystem(); |
| fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); |
| inputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports"); |
| outputLocation = new Path(fs.getWorkingDirectory(), "tmp/data"); |
| |
| job.setJarByClass(this.getClass()); |
| job.setNumReduceTasks(0); |
| System.out.println("Setup done"); |
| } |
| |
| private void setupMRExport(String[] records) throws IOException { |
| if (fs.exists(outputLocation)) { |
| fs.delete(outputLocation, true); |
| } |
| FSDataOutputStream ds = fs.create(outputLocation, true); |
| for (String record : records) { |
| ds.writeBytes(record); |
| } |
| ds.close(); |
| job.setInputFormatClass(TextInputFormat.class); |
| job.setOutputFormatClass(HCatEximOutputFormat.class); |
| TextInputFormat.setInputPaths(job, outputLocation); |
| job.setMapperClass(TestExport.class); |
| } |
| |
| private void setupMRImport() throws IOException { |
| if (fs.exists(outputLocation)) { |
| fs.delete(outputLocation, true); |
| } |
| job.setInputFormatClass(HCatEximInputFormat.class); |
| job.setOutputFormatClass(TextOutputFormat.class); |
| TextOutputFormat.setOutputPath(job, outputLocation); |
| job.setMapperClass(TestImport.class); |
| TestImport.empRecords.clear(); |
| } |
| |
| |
| @Override |
| protected void tearDown() throws Exception { |
| System.out.println("Teardown started"); |
| super.tearDown(); |
| // fs.delete(inputLocation, true); |
| // fs.delete(outputLocation, true); |
| System.out.println("Teardown done"); |
| } |
| |
| |
| private void runNonPartExport() throws IOException, InterruptedException, ClassNotFoundException { |
| if (fs.exists(inputLocation)) { |
| fs.delete(inputLocation, true); |
| } |
| setupMRExport(new String[] { |
| "237,Krishna,01/01/1990,M,IN,TN\n", |
| "238,Kalpana,01/01/2000,F,IN,KA\n", |
| "239,Satya,01/01/2001,M,US,TN\n", |
| "240,Kavya,01/01/2002,F,US,KA\n" |
| |
| }); |
| HCatEximOutputFormat.setOutput( |
| job, |
| dbName, |
| tblName, |
| inputLocation.toString(), |
| null, |
| null, |
| schema); |
| |
| job.waitForCompletion(true); |
| HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); |
| committer.cleanupJob(job); |
| } |
| |
| private void runPartExport(String record, String country, String state) throws IOException, InterruptedException, ClassNotFoundException { |
| setupMRExport(new String[] {record}); |
| List<String> partValues = new ArrayList<String>(2); |
| partValues.add(country); |
| partValues.add(state); |
| HCatEximOutputFormat.setOutput( |
| job, |
| dbName, |
| tblName, |
| inputLocation.toString(), |
| partSchema , |
| partValues , |
| schema); |
| |
| job.waitForCompletion(true); |
| HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); |
| committer.cleanupJob(job); |
| } |
| |
| public void testNonPart() throws Exception { |
| try { |
| runNonPartExport(); |
| setUp(); |
| setupMRImport(); |
| HCatEximInputFormat.setInput(job, "tmp/exports", null); |
| job.waitForCompletion(true); |
| |
| assertEquals(4, TestImport.empRecords.size()); |
| assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", null, null); |
| assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", null, null); |
| assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", null, null); |
| assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", null, null); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| public void testNonPartProjection() throws Exception { |
| try { |
| |
| runNonPartExport(); |
| setUp(); |
| setupMRImport(); |
| HCatEximInputFormat.setInput(job, "tmp/exports", null); |
| |
| List<HCatFieldSchema> readColumns = new ArrayList<HCatFieldSchema>(); |
| readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| |
| HCatEximInputFormat.setOutputSchema(job, new HCatSchema(readColumns)); |
| job.waitForCompletion(true); |
| |
| assertEquals(4, TestImport.empRecords.size()); |
| assertEmpDetail(TestImport.empRecords.get(237), "Krishna", null, null, null, null); |
| assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", null, null, null, null); |
| assertEmpDetail(TestImport.empRecords.get(239), "Satya", null, null, null, null); |
| assertEmpDetail(TestImport.empRecords.get(240), "Kavya", null, null, null, null); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| public void testPart() throws Exception { |
| try { |
| if (fs.exists(inputLocation)) { |
| fs.delete(inputLocation, true); |
| } |
| |
| List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2); |
| partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, "")); |
| partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, "")); |
| partSchema = new HCatSchema(partKeys); |
| |
| runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn"); |
| setUp(); |
| runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka"); |
| setUp(); |
| runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn"); |
| setUp(); |
| runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka"); |
| |
| setUp(); |
| setupMRImport(); |
| HCatEximInputFormat.setInput(job, "tmp/exports", null); |
| job.waitForCompletion(true); |
| |
| assertEquals(4, TestImport.empRecords.size()); |
| assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn"); |
| assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka"); |
| assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn"); |
| assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka"); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| public void testPartWithPartCols() throws Exception { |
| try { |
| if (fs.exists(inputLocation)) { |
| fs.delete(inputLocation, true); |
| } |
| |
| List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2); |
| partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, "")); |
| partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, "")); |
| partSchema = new HCatSchema(partKeys); |
| |
| runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn"); |
| setUp(); |
| runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka"); |
| setUp(); |
| runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn"); |
| setUp(); |
| runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka"); |
| |
| setUp(); |
| setupMRImport(); |
| HCatEximInputFormat.setInput(job, "tmp/exports", null); |
| |
| List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>(); |
| colsPlusPartKeys.addAll(columns); |
| colsPlusPartKeys.addAll(partKeys); |
| |
| HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(colsPlusPartKeys)); |
| job.waitForCompletion(true); |
| |
| assertEquals(4, TestImport.empRecords.size()); |
| assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn"); |
| assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka"); |
| assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn"); |
| assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka"); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| |
| public void testPartSelection() throws Exception { |
| try { |
| if (fs.exists(inputLocation)) { |
| fs.delete(inputLocation, true); |
| } |
| |
| List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2); |
| partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, "")); |
| partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, "")); |
| partSchema = new HCatSchema(partKeys); |
| |
| runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn"); |
| setUp(); |
| runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka"); |
| setUp(); |
| runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn"); |
| setUp(); |
| runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka"); |
| |
| setUp(); |
| setupMRImport(); |
| Map<String, String> filter = new TreeMap<String, String>(); |
| filter.put("emp_state", "ka"); |
| HCatEximInputFormat.setInput(job, "tmp/exports", filter); |
| job.waitForCompletion(true); |
| |
| assertEquals(2, TestImport.empRecords.size()); |
| assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka"); |
| assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka"); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| |
| private void assertEmpDetail(EmpDetails empDetails, String name, String dob, String mf, String country, String state) { |
| assertNotNull(empDetails); |
| assertEquals(name, empDetails.emp_name); |
| assertEquals(dob, empDetails.emp_dob); |
| assertEquals(mf, empDetails.emp_sex); |
| assertEquals(country, empDetails.emp_country); |
| assertEquals(state, empDetails.emp_state); |
| } |
| |
| } |