HCATALOG-487 HCatalog should tolerate a user-defined amount of bad records
git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1379476 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d9fb340..0db81b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -38,6 +38,8 @@
HCAT-427 Document storage-based authorization (lefty via gates)
IMPROVEMENTS
+ HCAT-487 HCatalog should tolerate a user-defined amount of bad records (traviscrawford)
+
HCAT-488 TestHCatStorer should extend HCatBaseTest so it uses junit4 and runs inside an IDE (traviscrawford)
HCAT-486 HCatalog should use checkstyle to enforce coding style (traviscrawford)
diff --git a/ivy.xml b/ivy.xml
index 71a21df..11e45bf 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -74,6 +74,8 @@
<dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}" conf="test->default"/>
<dependency org="org.apache.maven" name="maven-ant-tasks" rev="${maven-ant-tasks.version}" conf="test->*"/>
<dependency org="org.apache.hadoop" name="hadoop-test" rev="${hadoop20.version}" conf="test->default"/>
- <dependency org="com.puppycrawl.tools" name="checkstyle" rev="${checkstyle.version}" conf="test->default"/>
+ <dependency org="com.puppycrawl.tools" name="checkstyle" rev="${checkstyle.version}" conf="test->default">
+ <exclude org="com.google.collections"/>
+ </dependency>
</dependencies>
</ivy-module>
diff --git a/src/java/org/apache/hcatalog/common/HCatConstants.java b/src/java/org/apache/hcatalog/common/HCatConstants.java
index e826620..ad5a355 100644
--- a/src/java/org/apache/hcatalog/common/HCatConstants.java
+++ b/src/java/org/apache/hcatalog/common/HCatConstants.java
@@ -141,4 +141,22 @@
public static final String HCAT_DATA_TINY_SMALL_INT_PROMOTION =
"hcat.data.tiny.small.int.promotion";
public static final boolean HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT = false;
+
+ /**
+ * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT}).
+ * Threshold for the ratio of bad records that will be silently skipped without causing a task
+ * failure. This is useful when processing large data sets with corrupt records, when its
+ * acceptable to skip some bad records.
+ */
+ public static final String HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY = "hcat.input.bad.record.threshold";
+ public static final float HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT = 0.0001f;
+
+ /**
+ * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_MIN_DEFAULT}).
+ * Number of bad records that will be accepted before applying
+ * {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY}. This is necessary to prevent an initial bad
+ * record from causing a task failure.
+ */
+ public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+ public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
}
diff --git a/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java b/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
index 73c4186..c038619 100644
--- a/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
+++ b/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
@@ -45,8 +46,12 @@
class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
private static final Logger LOG = LoggerFactory.getLogger(HCatRecordReader.class);
+
+ private InputErrorTracker errorTracker;
+
WritableComparable currentKey;
Writable currentValue;
+ HCatRecord currentHCatRecord;
/** The underlying record reader to delegate to. */
private org.apache.hadoop.mapred.RecordReader<WritableComparable, Writable> baseRecordReader;
@@ -95,6 +100,8 @@
// Pull the table schema out of the Split info
// TODO This should be passed in the TaskAttemptContext instead
dataSchema = hcatSplit.getDataSchema();
+
+ errorTracker = new InputErrorTracker(taskContext.getConfiguration());
}
private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit,
@@ -137,30 +144,8 @@
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
*/
@Override
- public HCatRecord getCurrentValue()
- throws IOException, InterruptedException {
- HCatRecord r;
-
- try {
-
- r = new LazyHCatRecord(deserializer.deserialize(currentValue), deserializer.getObjectInspector());
- DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
- int i = 0;
- for (String fieldName : outputSchema.getFieldNames()){
- Integer dataPosn = null;
- if ((dataPosn = dataSchema.getPosition(fieldName)) != null){
- dr.set(i, r.get(fieldName,dataSchema));
- } else {
- dr.set(i, valuesNotInDataCols.get(fieldName));
- }
- i++;
- }
-
- return dr;
-
- } catch (Exception e) {
- throw new IOException("Failed to create HCatRecord ",e);
- }
+ public HCatRecord getCurrentValue() throws IOException, InterruptedException {
+ return currentHCatRecord;
}
/* (non-Javadoc)
@@ -176,21 +161,59 @@
return 0.0f; // errored
}
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
- */
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (currentKey == null) {
- currentKey = baseRecordReader.createKey();
- currentValue = baseRecordReader.createValue();
- }
-
- return baseRecordReader.next(currentKey,
- currentValue);
+ /**
+ * Check if the wrapped RecordReader has another record, and if so convert it into an
+ * HCatRecord. We both check for records and convert here so a configurable percent of
+ * bad records can be tolerated.
+ *
+ * @return if there is a next record
+ * @throws IOException on error
+ * @throws InterruptedException on error
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (currentKey == null) {
+ currentKey = baseRecordReader.createKey();
+ currentValue = baseRecordReader.createValue();
}
- /* (non-Javadoc)
+ while (baseRecordReader.next(currentKey, currentValue)) {
+ HCatRecord r = null;
+ Throwable t = null;
+
+ errorTracker.incRecords();
+
+ try {
+ Object o = deserializer.deserialize(currentValue);
+ r = new LazyHCatRecord(o, deserializer.getObjectInspector());
+ } catch (Throwable throwable) {
+ t = throwable;
+ }
+
+ if (r == null) {
+ errorTracker.incErrors(t);
+ continue;
+ }
+
+ DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
+ int i = 0;
+ for (String fieldName : outputSchema.getFieldNames()) {
+ if (dataSchema.getPosition(fieldName) != null) {
+ dr.set(i, r.get(fieldName, dataSchema));
+ } else {
+ dr.set(i, valuesNotInDataCols.get(fieldName));
+ }
+ i++;
+ }
+
+ currentHCatRecord = dr;
+ return true;
+ }
+
+ return false;
+ }
+
+ /* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#close()
*/
@Override
@@ -198,4 +221,64 @@
baseRecordReader.close();
}
+ /**
+ * Tracks number of of errors in input and throws a Runtime exception
+ * if the rate of errors crosses a limit.
+ * <br/>
+ * The intention is to skip over very rare file corruption or incorrect
+ * input, but catch programmer errors (incorrect format, or incorrect
+ * deserializers etc).
+ *
+ * This class was largely copied from Elephant-Bird (thanks @rangadi!)
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java
+ */
+ static class InputErrorTracker {
+ long numRecords;
+ long numErrors;
+
+ double errorThreshold; // max fraction of errors allowed
+ long minErrors; // throw error only after this many errors
+
+ InputErrorTracker(Configuration conf) {
+ errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY,
+ HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT);
+ minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY,
+ HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT);
+ numRecords = 0;
+ numErrors = 0;
+ }
+
+ void incRecords() {
+ numRecords++;
+ }
+
+ void incErrors(Throwable cause) {
+ numErrors++;
+ if (numErrors > numRecords) {
+ // incorrect use of this class
+ throw new RuntimeException("Forgot to invoke incRecords()?");
+ }
+
+ if (cause == null) {
+ cause = new Exception("Unknown error");
+ }
+
+ if (errorThreshold <= 0) { // no errors are tolerated
+ throw new RuntimeException("error while reading input records", cause);
+ }
+
+ LOG.warn("Error while reading an input record ("
+ + numErrors + " out of " + numRecords + " so far ): ", cause);
+
+ double errRate = numErrors / (double) numRecords;
+
+ // will always excuse the first error. We can decide if single
+ // error crosses threshold inside close() if we want to.
+ if (numErrors >= minErrors && errRate > errorThreshold) {
+ LOG.error(numErrors + " out of " + numRecords
+ + " crosses configured threshold (" + errorThreshold + ")");
+ throw new RuntimeException("error rate while reading input records crossed threshold", cause);
+ }
+ }
+ }
}
diff --git a/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java b/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
new file mode 100644
index 0000000..33c61ac
--- /dev/null
+++ b/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class TestHCatInputFormat extends HCatBaseTest {
+
+ private boolean setUpComplete = false;
+
+ /**
+ * Create an input sequence file with 100 records; every 10th record is bad.
+ * Load this table into Hive.
+ */
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ if (setUpComplete) {
+ return;
+ }
+
+ Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+ LOG.info("Creating data file: " + intStringSeq);
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+ intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+ NullWritable.class, BytesWritable.class);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TIOStreamTransport transport = new TIOStreamTransport(out);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ for (int i = 1; i <= 100; i++) {
+ if (i % 10 == 0) {
+ seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes()));
+ } else {
+ out.reset();
+ IntString intString = new IntString(i, Integer.toString(i), i);
+ intString.write(protocol);
+ BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+ seqFileWriter.append(NullWritable.get(), bytesWritable);
+ }
+ }
+
+ seqFileWriter.close();
+
+ // Now let's load this file into a new Hive table.
+ Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode());
+ Assert.assertEquals(0, driver.run(
+ "create table test_bad_records " +
+ "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+ "with serdeproperties ( " +
+ " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+ " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+ "stored as" +
+ " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+ " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+ .getResponseCode());
+ Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
+ "' into table test_bad_records").getResponseCode());
+
+ setUpComplete = true;
+ }
+
+ @Test
+ public void testBadRecordHandlingPasses() throws Exception {
+ Assert.assertTrue(runJob(0.1f));
+ }
+
+ @Test
+ public void testBadRecordHandlingFails() throws Exception {
+ Assert.assertFalse(runJob(0.01f));
+ }
+
+ private boolean runJob(float badRecordThreshold) throws Exception {
+ Configuration conf = new Configuration();
+
+ conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold);
+
+ Job job = new Job(conf);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MyMapper.class);
+
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null));
+
+ job.setMapOutputKeyClass(HCatRecord.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output");
+ if (path.getFileSystem(conf).exists(path)) {
+ path.getFileSystem(conf).delete(path, true);
+ }
+
+ TextOutputFormat.setOutputPath(job, path);
+
+ return job.waitForCompletion(true);
+ }
+
+ public static class MyMapper extends Mapper<NullWritable, HCatRecord, NullWritable, Text> {
+ @Override
+ public void map(NullWritable key, HCatRecord value, Context context)
+ throws IOException, InterruptedException {
+ LOG.info("HCatRecord: " + value);
+ context.write(NullWritable.get(), new Text(value.toString()));
+ }
+ }
+}