| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hcatalog.rcfile; |
| |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.util.*; |
| |
| import junit.framework.Assert; |
| import junit.framework.TestCase; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.ql.io.RCFile; |
| import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; |
| import org.apache.hadoop.hive.serde.Constants; |
| import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; |
| import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; |
| import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; |
| import org.apache.hadoop.io.compress.DefaultCodec; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hcatalog.common.HCatException; |
| import org.apache.hcatalog.common.HCatUtil; |
| import org.apache.hcatalog.data.DefaultHCatRecord; |
| import org.apache.hcatalog.data.HCatDataCheckUtil; |
| import org.apache.hcatalog.data.HCatRecord; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| import org.apache.hcatalog.rcfile.RCFileInputDriver; |
| import org.apache.hcatalog.shims.HCatHadoopShims; |
| |
| |
| public class TestRCFileInputStorageDriver extends TestCase{ |
| private static final Configuration conf = new Configuration(); |
| private static final Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred"); |
| private static final Path file = new Path(dir, "test_rcfile"); |
| private final HCatHadoopShims shim = HCatHadoopShims.Instance.get(); |
| |
| // Generate sample records to compare against |
| private byte[][][] getRecords() throws UnsupportedEncodingException { |
| byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"), |
| "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"), |
| "5.3".getBytes("UTF-8"), "hcatalog and hadoop".getBytes("UTF-8"), |
| new byte[0], "\\N".getBytes("UTF-8")}; |
| byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"), |
| "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"), |
| "5.3".getBytes("UTF-8"), "hcatalog and hadoop".getBytes("UTF-8"), |
| new byte[0], "\\N".getBytes("UTF-8")}; |
| return new byte[][][]{record_1, record_2}; |
| } |
| |
| // Write sample records to file for individual tests |
| private BytesRefArrayWritable[] initTestEnvironment() throws IOException { |
| FileSystem fs = FileSystem.getLocal(conf); |
| fs.delete(file, true); |
| |
| byte [][][] records = getRecords(); |
| RCFileOutputFormat.setColumnNumber(conf, 8); |
| RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, new DefaultCodec()); |
| |
| BytesRefArrayWritable bytes = writeBytesToFile(records[0], writer); |
| BytesRefArrayWritable bytes2 = writeBytesToFile(records[1], writer); |
| |
| writer.close(); |
| return new BytesRefArrayWritable[]{bytes,bytes2}; |
| } |
| |
| private BytesRefArrayWritable writeBytesToFile(byte[][] record, RCFile.Writer writer) throws IOException { |
| BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length); |
| for (int i = 0; i < record.length; i++) { |
| BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length); |
| bytes.set(i, cu); |
| } |
| writer.append(bytes); |
| return bytes; |
| } |
| |
| public void testConvertValueToTuple() throws IOException,InterruptedException{ |
| BytesRefArrayWritable[] bytesArr = initTestEnvironment(); |
| |
| HCatSchema schema = buildHiveSchema(); |
| RCFileInputDriver sd = new RCFileInputDriver(); |
| JobContext jc = shim.createJobContext(conf, new JobID()); |
| sd.setInputPath(jc, file.toString()); |
| InputFormat<?,?> iF = sd.getInputFormat(null); |
| InputSplit split = iF.getSplits(jc).get(0); |
| sd.setOriginalSchema(jc, schema); |
| sd.setOutputSchema(jc, schema); |
| sd.initialize(jc, getProps()); |
| |
| TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID()); |
| RecordReader<?,?> rr = iF.createRecordReader(split,tac); |
| rr.initialize(split, tac); |
| HCatRecord[] tuples = getExpectedRecords(); |
| for(int j=0; j < 2; j++){ |
| Assert.assertTrue(rr.nextKeyValue()); |
| BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue(); |
| Assert.assertEquals(bytesArr[j], w); |
| HCatRecord t = sd.convertToHCatRecord(null,w); |
| Assert.assertEquals(8, t.size()); |
| Assert.assertTrue(HCatDataCheckUtil.recordsEqual(t,tuples[j])); |
| } |
| } |
| |
| public void testPruning() throws IOException,InterruptedException{ |
| BytesRefArrayWritable[] bytesArr = initTestEnvironment(); |
| |
| RCFileInputDriver sd = new RCFileInputDriver(); |
| JobContext jc = shim.createJobContext(conf, new JobID()); |
| sd.setInputPath(jc, file.toString()); |
| InputFormat<?,?> iF = sd.getInputFormat(null); |
| InputSplit split = iF.getSplits(jc).get(0); |
| sd.setOriginalSchema(jc, buildHiveSchema()); |
| sd.setOutputSchema(jc, buildPrunedSchema()); |
| |
| sd.initialize(jc, getProps()); |
| conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); |
| TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID()); |
| RecordReader<?,?> rr = iF.createRecordReader(split,tac); |
| rr.initialize(split, tac); |
| HCatRecord[] tuples = getPrunedRecords(); |
| for(int j=0; j < 2; j++){ |
| Assert.assertTrue(rr.nextKeyValue()); |
| BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue(); |
| Assert.assertFalse(bytesArr[j].equals(w)); |
| Assert.assertEquals(w.size(), 8); |
| HCatRecord t = sd.convertToHCatRecord(null,w); |
| Assert.assertEquals(5, t.size()); |
| Assert.assertTrue(HCatDataCheckUtil.recordsEqual(t,tuples[j])); |
| } |
| assertFalse(rr.nextKeyValue()); |
| } |
| |
| public void testReorderdCols() throws IOException,InterruptedException{ |
| BytesRefArrayWritable[] bytesArr = initTestEnvironment(); |
| |
| RCFileInputDriver sd = new RCFileInputDriver(); |
| JobContext jc = shim.createJobContext(conf, new JobID()); |
| sd.setInputPath(jc, file.toString()); |
| InputFormat<?,?> iF = sd.getInputFormat(null); |
| InputSplit split = iF.getSplits(jc).get(0); |
| sd.setOriginalSchema(jc, buildHiveSchema()); |
| sd.setOutputSchema(jc, buildReorderedSchema()); |
| |
| sd.initialize(jc, getProps()); |
| Map<String,String> map = new HashMap<String,String>(1); |
| map.put("part1", "first-part"); |
| sd.setPartitionValues(jc, map); |
| conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); |
| TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID()); |
| RecordReader<?,?> rr = iF.createRecordReader(split,tac); |
| rr.initialize(split, tac); |
| HCatRecord[] tuples = getReorderedCols(); |
| for(int j=0; j < 2; j++){ |
| Assert.assertTrue(rr.nextKeyValue()); |
| BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue(); |
| Assert.assertFalse(bytesArr[j].equals(w)); |
| Assert.assertEquals(w.size(), 8); |
| HCatRecord t = sd.convertToHCatRecord(null,w); |
| Assert.assertEquals(7, t.size()); |
| Assert.assertTrue(HCatDataCheckUtil.recordsEqual(t,tuples[j])); |
| } |
| assertFalse(rr.nextKeyValue()); |
| } |
| private HCatRecord[] getExpectedRecords(){ |
| List<Object> rec_1 = new ArrayList<Object>(8); |
| Collections.addAll(rec_1, new Byte("123"), |
| new Short("456"), |
| new Integer(789), |
| new Long(1000L), |
| new Double(5.3D), |
| new String("hcatalog and hadoop"), |
| null, |
| null); |
| |
| HCatRecord tup_1 = new DefaultHCatRecord(rec_1); |
| |
| List<Object> rec_2 = new ArrayList<Object>(8); |
| Collections.addAll(rec_2, new Byte("100"), |
| new Short("200"), |
| new Integer(123), |
| new Long(1000L), |
| new Double(5.3D), |
| new String("hcatalog and hadoop"), |
| null, |
| null); |
| HCatRecord tup_2 = new DefaultHCatRecord(rec_2); |
| |
| return new HCatRecord[]{tup_1,tup_2}; |
| } |
| |
| private HCatRecord[] getPrunedRecords(){ |
| List<Object> rec_1 = new ArrayList<Object>(8); |
| Collections.addAll(rec_1, new Byte("123"), |
| new Integer(789), |
| new Double(5.3D), |
| new String("hcatalog and hadoop"), |
| null); |
| HCatRecord tup_1 = new DefaultHCatRecord(rec_1); |
| |
| List<Object> rec_2 = new ArrayList<Object>(8); |
| Collections.addAll(rec_2, new Byte("100"), |
| new Integer(123), |
| new Double(5.3D), |
| new String("hcatalog and hadoop"), |
| null); |
| HCatRecord tup_2 = new DefaultHCatRecord(rec_2); |
| |
| return new HCatRecord[]{tup_1,tup_2}; |
| } |
| |
| private HCatSchema buildHiveSchema() throws HCatException{ |
| return new HCatSchema(HCatUtil.getHCatFieldSchemaList(new FieldSchema("atinyint", "tinyint", ""), |
| new FieldSchema("asmallint", "smallint", ""), |
| new FieldSchema("aint", "int", ""), |
| new FieldSchema("along", "bigint", ""), |
| new FieldSchema("adouble", "double", ""), |
| new FieldSchema("astring", "string", ""), |
| new FieldSchema("anullint", "int", ""), |
| new FieldSchema("anullstring", "string", ""))); |
| } |
| |
| private HCatSchema buildPrunedSchema() throws HCatException{ |
| return new HCatSchema(HCatUtil.getHCatFieldSchemaList(new FieldSchema("atinyint", "tinyint", ""), |
| new FieldSchema("aint", "int", ""), |
| new FieldSchema("adouble", "double", ""), |
| new FieldSchema("astring", "string", ""), |
| new FieldSchema("anullint", "int", ""))); |
| } |
| |
| private HCatSchema buildReorderedSchema() throws HCatException{ |
| return new HCatSchema(HCatUtil.getHCatFieldSchemaList(new FieldSchema("aint", "int", ""), |
| new FieldSchema("part1", "string", ""), |
| new FieldSchema("adouble", "double", ""), |
| new FieldSchema("newCol", "tinyint", ""), |
| new FieldSchema("astring", "string", ""), |
| new FieldSchema("atinyint", "tinyint", ""), |
| new FieldSchema("anullint", "int", ""))); |
| } |
| |
| private HCatRecord[] getReorderedCols(){ |
| List<Object> rec_1 = new ArrayList<Object>(7); |
| Collections.addAll(rec_1, new Integer(789), |
| new String("first-part"), |
| new Double(5.3D), |
| null, // new column |
| new String("hcatalog and hadoop"), |
| new Byte("123"), |
| null); |
| HCatRecord tup_1 = new DefaultHCatRecord(rec_1); |
| |
| List<Object> rec_2 = new ArrayList<Object>(7); |
| Collections.addAll(rec_2, new Integer(123), |
| new String("first-part"), |
| new Double(5.3D), |
| null, |
| new String("hcatalog and hadoop"), |
| new Byte("100"), |
| null); |
| HCatRecord tup_2 = new DefaultHCatRecord(rec_2); |
| |
| return new HCatRecord[]{tup_1,tup_2}; |
| |
| } |
| private Properties getProps(){ |
| Properties props = new Properties(); |
| props.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "\\N"); |
| props.setProperty(Constants.SERIALIZATION_FORMAT, "9"); |
| return props; |
| } |
| } |