blob: 80deeb7ee5faf1d2d492ca7f852dfa95f7cfeeba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.rcfile;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hcatalog.shims.HCatHadoopShims;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TestRCFile.
*
*/
public class TestRCFileMapReduceInputFormat extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(TestRCFileMapReduceInputFormat.class);
private static Configuration conf = new Configuration();
private static ColumnarSerDe serDe;
private static Path file;
private static FileSystem fs;
private static Properties tbl;
static {
try {
fs = FileSystem.getLocal(conf);
Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
file = new Path(dir, "test_rcfile");
fs.delete(dir, true);
// the SerDe part is from TestLazySimpleSerDe
serDe = new ColumnarSerDe();
// Create the SerDe
tbl = createProperties();
serDe.initialize(conf, tbl);
} catch (Exception e) {
}
}
private static BytesRefArrayWritable patialS = new BytesRefArrayWritable();
private static byte[][] bytesArray = null;
private static BytesRefArrayWritable s = null;
static {
try {
bytesArray = new byte[][]{"123".getBytes("UTF-8"),
"456".getBytes("UTF-8"), "789".getBytes("UTF-8"),
"1000".getBytes("UTF-8"), "5.3".getBytes("UTF-8"),
"hive and hadoop".getBytes("UTF-8"), new byte[0],
"NULL".getBytes("UTF-8")};
s = new BytesRefArrayWritable(bytesArray.length);
s.set(0, new BytesRefWritable("123".getBytes("UTF-8")));
s.set(1, new BytesRefWritable("456".getBytes("UTF-8")));
s.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
s.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
s.set(4, new BytesRefWritable("5.3".getBytes("UTF-8")));
s.set(5, new BytesRefWritable("hive and hadoop".getBytes("UTF-8")));
s.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
s.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
// partial test init
patialS.set(0, new BytesRefWritable("NULL".getBytes("UTF-8")));
patialS.set(1, new BytesRefWritable("NULL".getBytes("UTF-8")));
patialS.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
patialS.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
patialS.set(4, new BytesRefWritable("NULL".getBytes("UTF-8")));
patialS.set(5, new BytesRefWritable("NULL".getBytes("UTF-8")));
patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
} catch (UnsupportedEncodingException e) {
}
}
/** For debugging and testing. */
public static void main(String[] args) throws Exception {
int count = 10000;
boolean create = true;
String usage = "Usage: RCFile " + "[-count N]" + " file";
if (args.length == 0) {
LOG.error(usage);
System.exit(-1);
}
try {
for (int i = 0; i < args.length; ++i) { // parse command line
if (args[i] == null) {
continue;
} else if (args[i].equals("-count")) {
count = Integer.parseInt(args[++i]);
} else {
// file is required parameter
file = new Path(args[i]);
}
}
if (file == null) {
LOG.error(usage);
System.exit(-1);
}
LOG.info("count = {}", count);
LOG.info("create = {}", create);
LOG.info("file = {}", file);
// test.performanceTest();
LOG.info("Finished.");
} finally {
fs.close();
}
}
private static Properties createProperties() {
Properties tbl = new Properties();
// Set the configuration parameters
tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
tbl.setProperty("columns",
"abyte,ashort,aint,along,adouble,astring,anullint,anullstring");
tbl.setProperty("columns.types",
"tinyint:smallint:int:bigint:double:string:int:string");
tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
return tbl;
}
public void testSynAndSplit() throws IOException, InterruptedException {
splitBeforeSync();
splitRightBeforeSync();
splitInMiddleOfSync();
splitRightAfterSync();
splitAfterSync();
}
private void splitBeforeSync() throws IOException, InterruptedException {
writeThenReadByRecordReader(600, 1000, 2, 17684, null);
}
private void splitRightBeforeSync() throws IOException, InterruptedException {
writeThenReadByRecordReader(500, 1000, 2, 17750, null);
}
private void splitInMiddleOfSync() throws IOException, InterruptedException {
writeThenReadByRecordReader(500, 1000, 2, 17760, null);
}
private void splitRightAfterSync() throws IOException, InterruptedException {
writeThenReadByRecordReader(500, 1000, 2, 17770, null);
}
private void splitAfterSync() throws IOException, InterruptedException {
writeThenReadByRecordReader(500, 1000, 2, 19950, null);
}
private void writeThenReadByRecordReader(int intervalRecordCount,
int writeCount, int splitNumber, long maxSplitSize, CompressionCodec codec)
throws IOException, InterruptedException {
Path testDir = new Path(System.getProperty("test.data.dir", ".")
+ "/mapred/testsmallfirstsplit");
Path testFile = new Path(testDir, "test_rcfile");
fs.delete(testFile, true);
Configuration cloneConf = new Configuration(conf);
RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
for (int i = 0; i < bytesArray.length; i++) {
BytesRefWritable cu = null;
cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
bytes.set(i, cu);
}
for (int i = 0; i < writeCount; i++) {
writer.append(bytes);
}
writer.close();
RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable> inputFormat = new RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable>();
Configuration jonconf = new Configuration(cloneConf);
jonconf.set("mapred.input.dir", testDir.toString());
JobContext context = new Job(jonconf);
context.getConfiguration().setLong("mapred.max.split.size", maxSplitSize);
List<InputSplit> splits = inputFormat.getSplits(context);
assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
int readCount = 0;
for (int i = 0; i < splits.size(); i++) {
TaskAttemptContext tac = HCatHadoopShims.Instance.get().createTaskAttemptContext(jonconf, new TaskAttemptID());
RecordReader<LongWritable, BytesRefArrayWritable> rr = inputFormat.createRecordReader(splits.get(i), tac);
rr.initialize(splits.get(i), tac);
while (rr.nextKeyValue()) {
readCount++;
}
}
assertEquals("readCount should be equal to writeCount", readCount, writeCount);
}
}