| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hcatalog.mapreduce; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.Partition; |
| import org.apache.hadoop.hive.metastore.api.Table; |
| import org.apache.hadoop.hive.ql.parse.EximUtil; |
| import org.apache.hadoop.hive.serde.Constants; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.common.HCatUtil; |
| import org.apache.hcatalog.data.DefaultHCatRecord; |
| import org.apache.hcatalog.data.HCatRecord; |
| import org.apache.hcatalog.data.schema.HCatFieldSchema; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| import org.apache.hcatalog.data.schema.HCatSchemaUtils; |
| |
| /** |
| * |
| * TestHCatEximOutputFormat. Some basic testing here. More testing done via |
| * TestHCatEximInputFormat |
| * |
| */ |
| public class TestHCatEximOutputFormat extends TestCase { |
| |
| public static class TestMap extends |
| Mapper<LongWritable, Text, LongWritable, HCatRecord> { |
| |
| private HCatSchema recordSchema; |
| |
| @Override |
| protected void setup(Context context) throws IOException, |
| InterruptedException { |
| super.setup(context); |
| recordSchema = HCatEximOutputFormat.getTableSchema(context); |
| System.out.println("TestMap/setup called"); |
| } |
| |
| @Override |
| public void map(LongWritable key, Text value, Context context) |
| throws IOException, InterruptedException { |
| String[] cols = value.toString().split(","); |
| HCatRecord record = new DefaultHCatRecord(recordSchema.size()); |
| System.out.println("TestMap/map called. Cols[0]:" + cols[0]); |
| System.out.println("TestMap/map called. Cols[1]:" + cols[1]); |
| System.out.println("TestMap/map called. Cols[2]:" + cols[2]); |
| System.out.println("TestMap/map called. Cols[3]:" + cols[3]); |
| record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0])); |
| record.setString("emp_name", recordSchema, cols[1]); |
| record.setString("emp_dob", recordSchema, cols[2]); |
| record.setString("emp_sex", recordSchema, cols[3]); |
| context.write(key, record); |
| } |
| } |
| |
| |
| private static final String dbName = "hcatEximOutputFormatTestDB"; |
| private static final String tblName = "hcatEximOutputFormatTestTable"; |
| Configuration conf; |
| Job job; |
| List<HCatFieldSchema> columns; |
| HCatSchema schema; |
| FileSystem fs; |
| Path outputLocation; |
| Path dataLocation; |
| |
| public void testNonPart() throws Exception { |
| try { |
| HCatEximOutputFormat.setOutput( |
| job, |
| dbName, |
| tblName, |
| outputLocation.toString(), |
| null, |
| null, |
| schema); |
| |
| job.waitForCompletion(true); |
| HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); |
| committer.cleanupJob(job); |
| |
| Path metadataPath = new Path(outputLocation, "_metadata"); |
| Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath); |
| Table table = rv.getKey(); |
| List<Partition> partitions = rv.getValue(); |
| |
| assertEquals(dbName, table.getDbName()); |
| assertEquals(tblName, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(0, table.getPartitionKeys().size()); |
| |
| assertEquals(0, partitions.size()); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| |
| } |
| |
| public void testPart() throws Exception { |
| try { |
| List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(); |
| partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country", |
| Constants.STRING_TYPE_NAME, ""))); |
| partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state", |
| Constants.STRING_TYPE_NAME, ""))); |
| HCatSchema partitionSchema = new HCatSchema(partKeys); |
| |
| List<String> partitionVals = new ArrayList<String>(); |
| partitionVals.add("IN"); |
| partitionVals.add("TN"); |
| |
| HCatEximOutputFormat.setOutput( |
| job, |
| dbName, |
| tblName, |
| outputLocation.toString(), |
| partitionSchema, |
| partitionVals, |
| schema); |
| |
| job.waitForCompletion(true); |
| HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); |
| committer.cleanupJob(job); |
| Path metadataPath = new Path(outputLocation, "_metadata"); |
| Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath); |
| Table table = rv.getKey(); |
| List<Partition> partitions = rv.getValue(); |
| |
| assertEquals(dbName, table.getDbName()); |
| assertEquals(tblName, table.getTableName()); |
| assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), |
| HCatUtil.getFieldSchemaList(columns))); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", |
| table.getSd().getInputFormat()); |
| assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", |
| table.getSd().getOutputFormat()); |
| assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", |
| table.getSd().getSerdeInfo().getSerializationLib()); |
| assertEquals(2, table.getPartitionKeys().size()); |
| List<FieldSchema> partSchema = table.getPartitionKeys(); |
| assertEquals("emp_country", partSchema.get(0).getName()); |
| assertEquals("emp_state", partSchema.get(1).getName()); |
| |
| assertEquals(1, partitions.size()); |
| Partition partition = partitions.get(0); |
| assertEquals("IN", partition.getValues().get(0)); |
| assertEquals("TN", partition.getValues().get(1)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); |
| assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", |
| partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); |
| } catch (Exception e) { |
| System.out.println("Test failed with " + e.getMessage()); |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| @Override |
| protected void setUp() throws Exception { |
| System.out.println("Setup started"); |
| super.setUp(); |
| conf = new Configuration(); |
| job = new Job(conf, "test eximoutputformat"); |
| columns = new ArrayList<HCatFieldSchema>(); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", |
| Constants.INT_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", |
| Constants.STRING_TYPE_NAME, ""))); |
| columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", |
| Constants.STRING_TYPE_NAME, ""))); |
| schema = new HCatSchema(columns); |
| |
| fs = new LocalFileSystem(); |
| fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); |
| outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports"); |
| if (fs.exists(outputLocation)) { |
| fs.delete(outputLocation, true); |
| } |
| dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data"); |
| if (fs.exists(dataLocation)) { |
| fs.delete(dataLocation, true); |
| } |
| FSDataOutputStream ds = fs.create(dataLocation, true); |
| ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n"); |
| ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n"); |
| ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n"); |
| ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n"); |
| ds.close(); |
| |
| job.setInputFormatClass(TextInputFormat.class); |
| job.setOutputFormatClass(HCatEximOutputFormat.class); |
| TextInputFormat.setInputPaths(job, dataLocation); |
| job.setJarByClass(this.getClass()); |
| job.setMapperClass(TestMap.class); |
| job.setNumReduceTasks(0); |
| System.out.println("Setup done"); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| System.out.println("Teardown started"); |
| super.tearDown(); |
| fs.delete(dataLocation, true); |
| fs.delete(outputLocation, true); |
| System.out.println("Teardown done"); |
| } |
| } |