HCATALOG-487 HCatalog should tolerate a user-defined amount of bad records

git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1379476 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d9fb340..0db81b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -38,6 +38,8 @@
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-487 HCatalog should tolerate a user-defined amount of bad records (traviscrawford)
+
   HCAT-488 TestHCatStorer should extend HCatBaseTest so it uses junit4 and runs inside an IDE (traviscrawford)
 
   HCAT-486 HCatalog should use checkstyle to enforce coding style (traviscrawford)
diff --git a/ivy.xml b/ivy.xml
index 71a21df..11e45bf 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -74,6 +74,8 @@
     <dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}" conf="test->default"/>
     <dependency org="org.apache.maven" name="maven-ant-tasks" rev="${maven-ant-tasks.version}" conf="test->*"/>
     <dependency org="org.apache.hadoop" name="hadoop-test" rev="${hadoop20.version}" conf="test->default"/>
-    <dependency org="com.puppycrawl.tools" name="checkstyle" rev="${checkstyle.version}" conf="test->default"/>
+    <dependency org="com.puppycrawl.tools" name="checkstyle" rev="${checkstyle.version}" conf="test->default">
+      <exclude org="com.google.collections"/>
+    </dependency>
   </dependencies>
 </ivy-module>
diff --git a/src/java/org/apache/hcatalog/common/HCatConstants.java b/src/java/org/apache/hcatalog/common/HCatConstants.java
index e826620..ad5a355 100644
--- a/src/java/org/apache/hcatalog/common/HCatConstants.java
+++ b/src/java/org/apache/hcatalog/common/HCatConstants.java
@@ -141,4 +141,22 @@
   public static final String HCAT_DATA_TINY_SMALL_INT_PROMOTION =
       "hcat.data.tiny.small.int.promotion";
   public static final boolean HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT = false;
+
+  /**
+   * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT}).
+   * Threshold for the ratio of bad records that will be silently skipped without causing a task
+   * failure. This is useful when processing large data sets with corrupt records, when its
+   * acceptable to skip some bad records.
+   */
+  public static final String HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY = "hcat.input.bad.record.threshold";
+  public static final float HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT = 0.0001f;
+
+  /**
+   * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_MIN_DEFAULT}).
+   * Number of bad records that will be accepted before applying
+   * {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY}. This is necessary to prevent an initial bad
+   * record from causing a task failure.
+   */
+  public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+  public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
 }
diff --git a/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java b/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
index 73c4186..c038619 100644
--- a/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
+++ b/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
@@ -45,8 +46,12 @@
 class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HCatRecordReader.class);
+
+    private InputErrorTracker errorTracker;
+
     WritableComparable currentKey;
     Writable currentValue;
+    HCatRecord currentHCatRecord;
 
     /** The underlying record reader to delegate to. */
     private org.apache.hadoop.mapred.RecordReader<WritableComparable, Writable> baseRecordReader;
@@ -95,6 +100,8 @@
       // Pull the table schema out of the Split info
       // TODO This should be passed in the TaskAttemptContext instead
       dataSchema = hcatSplit.getDataSchema();
+
+      errorTracker = new InputErrorTracker(taskContext.getConfiguration());
     }
 
     private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit,
@@ -137,30 +144,8 @@
      * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
      */
     @Override
-    public HCatRecord getCurrentValue()
-    throws IOException, InterruptedException {
-      HCatRecord r;
-
-      try {
-
-        r = new LazyHCatRecord(deserializer.deserialize(currentValue), deserializer.getObjectInspector());
-        DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
-        int i = 0;
-        for (String fieldName : outputSchema.getFieldNames()){
-          Integer dataPosn = null;
-          if ((dataPosn = dataSchema.getPosition(fieldName)) != null){
-            dr.set(i, r.get(fieldName,dataSchema));
-          } else {
-            dr.set(i, valuesNotInDataCols.get(fieldName));
-          }
-          i++;
-        }
-
-        return dr;
-
-      } catch (Exception e) {
-        throw new IOException("Failed to create HCatRecord ",e);
-      }
+    public HCatRecord getCurrentValue() throws IOException, InterruptedException {
+      return currentHCatRecord;
     }
 
     /* (non-Javadoc)
@@ -176,21 +161,59 @@
         return 0.0f; // errored
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
-     */
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (currentKey == null) {
-        currentKey = baseRecordReader.createKey();
-        currentValue = baseRecordReader.createValue();
-      }
-
-        return baseRecordReader.next(currentKey,
-                                     currentValue);
+  /**
+   * Check if the wrapped RecordReader has another record, and if so convert it into an
+   * HCatRecord. We both check for records and convert here so a configurable percent of
+   * bad records can be tolerated.
+   *
+   * @return if there is a next record
+   * @throws IOException on error
+   * @throws InterruptedException on error
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (currentKey == null) {
+      currentKey = baseRecordReader.createKey();
+      currentValue = baseRecordReader.createValue();
     }
 
-    /* (non-Javadoc)
+    while (baseRecordReader.next(currentKey, currentValue)) {
+      HCatRecord r = null;
+      Throwable t = null;
+
+      errorTracker.incRecords();
+
+      try {
+        Object o = deserializer.deserialize(currentValue);
+        r = new LazyHCatRecord(o, deserializer.getObjectInspector());
+      } catch (Throwable throwable) {
+        t = throwable;
+      }
+
+      if (r == null) {
+        errorTracker.incErrors(t);
+        continue;
+      }
+
+      DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
+      int i = 0;
+      for (String fieldName : outputSchema.getFieldNames()) {
+        if (dataSchema.getPosition(fieldName) != null) {
+          dr.set(i, r.get(fieldName, dataSchema));
+        } else {
+          dr.set(i, valuesNotInDataCols.get(fieldName));
+        }
+        i++;
+      }
+
+      currentHCatRecord = dr;
+      return true;
+    }
+
+    return false;
+  }
+
+  /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.RecordReader#close()
      */
     @Override
@@ -198,4 +221,64 @@
         baseRecordReader.close();
     }
 
+  /**
+   * Tracks number of of errors in input and throws a Runtime exception
+   * if the rate of errors crosses a limit.
+   * <br/>
+   * The intention is to skip over very rare file corruption or incorrect
+   * input, but catch programmer errors (incorrect format, or incorrect
+   * deserializers etc).
+   *
+   * This class was largely copied from Elephant-Bird (thanks @rangadi!)
+   * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java
+   */
+  static class InputErrorTracker {
+    long numRecords;
+    long numErrors;
+
+    double errorThreshold; // max fraction of errors allowed
+    long minErrors; // throw error only after this many errors
+
+    InputErrorTracker(Configuration conf) {
+      errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY,
+          HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT);
+      minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY,
+          HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT);
+      numRecords = 0;
+      numErrors = 0;
+    }
+
+    void incRecords() {
+      numRecords++;
+    }
+
+    void incErrors(Throwable cause) {
+      numErrors++;
+      if (numErrors > numRecords) {
+        // incorrect use of this class
+        throw new RuntimeException("Forgot to invoke incRecords()?");
+      }
+
+      if (cause == null) {
+        cause = new Exception("Unknown error");
+      }
+
+      if (errorThreshold <= 0) { // no errors are tolerated
+        throw new RuntimeException("error while reading input records", cause);
+      }
+
+      LOG.warn("Error while reading an input record ("
+          + numErrors + " out of " + numRecords + " so far ): ", cause);
+
+      double errRate = numErrors / (double) numRecords;
+
+      // will always excuse the first error. We can decide if single
+      // error crosses threshold inside close() if we want to.
+      if (numErrors >= minErrors && errRate > errorThreshold) {
+        LOG.error(numErrors + " out of " + numRecords
+            + " crosses configured threshold (" + errorThreshold + ")");
+        throw new RuntimeException("error rate while reading input records crossed threshold", cause);
+      }
+    }
+  }
 }
diff --git a/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java b/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
new file mode 100644
index 0000000..33c61ac
--- /dev/null
+++ b/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class TestHCatInputFormat extends HCatBaseTest {
+
+  private boolean setUpComplete = false;
+
+  /**
+   * Create an input sequence file with 100 records; every 10th record is bad.
+   * Load this table into Hive.
+   */
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    if (setUpComplete) {
+      return;
+    }
+
+    Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+    LOG.info("Creating data file: " + intStringSeq);
+    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+        intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+        NullWritable.class, BytesWritable.class);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(out);
+    TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+    for (int i = 1; i <= 100; i++) {
+      if (i % 10 == 0) {
+        seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes()));
+      } else {
+        out.reset();
+        IntString intString = new IntString(i, Integer.toString(i), i);
+        intString.write(protocol);
+        BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+        seqFileWriter.append(NullWritable.get(), bytesWritable);
+      }
+    }
+
+    seqFileWriter.close();
+
+    // Now let's load this file into a new Hive table.
+    Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode());
+    Assert.assertEquals(0, driver.run(
+        "create table test_bad_records " +
+            "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+            "with serdeproperties ( " +
+            "  'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+            "  'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+            "stored as" +
+            "  inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+            "  outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+        .getResponseCode());
+    Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
+        "' into table test_bad_records").getResponseCode());
+
+    setUpComplete = true;
+  }
+
+  @Test
+  public void testBadRecordHandlingPasses() throws Exception {
+    Assert.assertTrue(runJob(0.1f));
+  }
+
+  @Test
+  public void testBadRecordHandlingFails() throws Exception {
+    Assert.assertFalse(runJob(0.01f));
+  }
+
+  private boolean runJob(float badRecordThreshold) throws Exception {
+    Configuration conf = new Configuration();
+
+    conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold);
+
+    Job job = new Job(conf);
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(MyMapper.class);
+
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null));
+
+    job.setMapOutputKeyClass(HCatRecord.class);
+    job.setMapOutputValueClass(HCatRecord.class);
+
+    job.setNumReduceTasks(0);
+
+    Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output");
+    if (path.getFileSystem(conf).exists(path)) {
+      path.getFileSystem(conf).delete(path, true);
+    }
+
+    TextOutputFormat.setOutputPath(job, path);
+
+    return job.waitForCompletion(true);
+  }
+
+  public static class MyMapper extends Mapper<NullWritable, HCatRecord, NullWritable, Text> {
+    @Override
+    public void map(NullWritable key, HCatRecord value, Context context)
+        throws IOException, InterruptedException {
+      LOG.info("HCatRecord: " + value);
+      context.write(NullWritable.get(), new Text(value.toString()));
+    }
+  }
+}