blob: bf4fb4825cca7e0851b1c0af47881c734d517cce [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
/**
*
* TestHCatEximOutputFormat. Some basic testing here. More testing done via
* TestHCatEximInputFormat
*
*/
public class TestHCatEximOutputFormat extends TestCase {
public static class TestMap extends
Mapper<LongWritable, Text, LongWritable, HCatRecord> {
private HCatSchema recordSchema;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
recordSchema = HCatEximOutputFormat.getTableSchema(context);
System.out.println("TestMap/setup called");
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
HCatRecord record = new DefaultHCatRecord(recordSchema.size());
System.out.println("TestMap/map called. Cols[0]:" + cols[0]);
System.out.println("TestMap/map called. Cols[1]:" + cols[1]);
System.out.println("TestMap/map called. Cols[2]:" + cols[2]);
System.out.println("TestMap/map called. Cols[3]:" + cols[3]);
record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
record.setString("emp_name", recordSchema, cols[1]);
record.setString("emp_dob", recordSchema, cols[2]);
record.setString("emp_sex", recordSchema, cols[3]);
context.write(key, record);
}
}
private static final String dbName = "hcatEximOutputFormatTestDB";
private static final String tblName = "hcatEximOutputFormatTestTable";
Configuration conf;
Job job;
List<HCatFieldSchema> columns;
HCatSchema schema;
FileSystem fs;
Path outputLocation;
Path dataLocation;
public void testNonPart() throws Exception {
try {
HCatEximOutputFormat.setOutput(
job,
dbName,
tblName,
outputLocation.toString(),
null,
null,
schema);
job.waitForCompletion(true);
HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
Path metadataPath = new Path(outputLocation, "_metadata");
Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
Table table = rv.getKey();
List<Partition> partitions = rv.getValue();
assertEquals(dbName, table.getDbName());
assertEquals(tblName, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(0, table.getPartitionKeys().size());
assertEquals(0, partitions.size());
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
public void testPart() throws Exception {
try {
List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>();
partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country",
Constants.STRING_TYPE_NAME, "")));
partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state",
Constants.STRING_TYPE_NAME, "")));
HCatSchema partitionSchema = new HCatSchema(partKeys);
List<String> partitionVals = new ArrayList<String>();
partitionVals.add("IN");
partitionVals.add("TN");
HCatEximOutputFormat.setOutput(
job,
dbName,
tblName,
outputLocation.toString(),
partitionSchema,
partitionVals,
schema);
job.waitForCompletion(true);
HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
committer.cleanupJob(job);
Path metadataPath = new Path(outputLocation, "_metadata");
Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
Table table = rv.getKey();
List<Partition> partitions = rv.getValue();
assertEquals(dbName, table.getDbName());
assertEquals(tblName, table.getTableName());
assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
HCatUtil.getFieldSchemaList(columns)));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
table.getSd().getInputFormat());
assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
table.getSd().getOutputFormat());
assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
table.getSd().getSerdeInfo().getSerializationLib());
assertEquals(2, table.getPartitionKeys().size());
List<FieldSchema> partSchema = table.getPartitionKeys();
assertEquals("emp_country", partSchema.get(0).getName());
assertEquals("emp_state", partSchema.get(1).getName());
assertEquals(1, partitions.size());
Partition partition = partitions.get(0);
assertEquals("IN", partition.getValues().get(0));
assertEquals("TN", partition.getValues().get(1));
assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
} catch (Exception e) {
System.out.println("Test failed with " + e.getMessage());
e.printStackTrace();
throw e;
}
}
@Override
protected void setUp() throws Exception {
System.out.println("Setup started");
super.setUp();
conf = new Configuration();
job = new Job(conf, "test eximoutputformat");
columns = new ArrayList<HCatFieldSchema>();
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
Constants.INT_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
Constants.STRING_TYPE_NAME, "")));
columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
Constants.STRING_TYPE_NAME, "")));
schema = new HCatSchema(columns);
fs = new LocalFileSystem();
fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
if (fs.exists(outputLocation)) {
fs.delete(outputLocation, true);
}
dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
if (fs.exists(dataLocation)) {
fs.delete(dataLocation, true);
}
FSDataOutputStream ds = fs.create(dataLocation, true);
ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n");
ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n");
ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n");
ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n");
ds.close();
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HCatEximOutputFormat.class);
TextInputFormat.setInputPaths(job, dataLocation);
job.setJarByClass(this.getClass());
job.setMapperClass(TestMap.class);
job.setNumReduceTasks(0);
System.out.println("Setup done");
}
@Override
protected void tearDown() throws Exception {
System.out.println("Teardown started");
super.tearDown();
fs.delete(dataLocation, true);
fs.delete(outputLocation, true);
System.out.println("Teardown done");
}
}