Incorporating code review feedback for DataSource
diff --git a/docs/configurations.md b/docs/configurations.md
index 88b3a5a..51f2d31 100644
--- a/docs/configurations.md
+++ b/docs/configurations.md
@@ -97,7 +97,7 @@
         - [OPERATION_OPT_KEY](#OPERATION_OPT_KEY) (Default: upsert) <br/>
         <span style="color:grey">whether to do upsert, insert or bulkinsert for the write operation</span>
         - [STORAGE_TYPE_OPT_KEY](#STORAGE_TYPE_OPT_KEY) (Default: COPY_ON_WRITE) <br/>
-        <span style="color:grey">The storage type for the underlying data, for this write.</span>
+        <span style="color:grey">The storage type for the underlying data, for this write. This can't change between writes.</span>
         - [TABLE_NAME_OPT_KEY](#TABLE_NAME_OPT_KEY) (Default: None (mandatory)) <br/>
         <span style="color:grey">Hive table name, to register the dataset into.</span>
         - [PRECOMBINE_FIELD_OPT_KEY](#PRECOMBINE_FIELD_OPT_KEY) (Default: ts) <br/>
@@ -121,7 +121,7 @@
 
     - [read options](#readoptions) (read.format.option(...)) <br/>
     <span style="color:grey">Options useful for reading datasets</span>
-        - [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default:  = READ_OPTIMIZED) <br/>
+        - [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default:  = read_optimized) <br/>
         <span style="color:grey">Whether data needs to be read, in incremental mode (new data since an instantTime)
         (or) Read Optimized mode (obtain latest view, based on columnar data)
         (or) Real time mode (obtain latest view, based on row & columnar data)</span>
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
index e0148f9..002b6cd 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
@@ -107,9 +107,9 @@
         return HoodieReadClient.addHoodieSupport(sparkConf);
     }
 
-    public static HashMap<String, String> getLatestFileIsToFullPath(String basePath,
-                                                                    HoodieTimeline commitTimeline,
-                                                                    List<HoodieInstant> commitsToReturn) throws IOException {
+    public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath,
+                                                                     HoodieTimeline commitTimeline,
+                                                                     List<HoodieInstant> commitsToReturn) throws IOException {
         HashMap<String, String> fileIdToFullPath = new HashMap<>();
         for (HoodieInstant commit : commitsToReturn) {
             HoodieCommitMetadata metadata =
@@ -129,7 +129,7 @@
             new HoodieException("No commit exists at " + commitTime);
         }
         try {
-            HashMap<String, String> paths = getLatestFileIsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
+            HashMap<String, String> paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
             return sqlContext.read()
                     .parquet(paths.values().toArray(new String[paths.size()]))
                     .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime));
@@ -150,7 +150,7 @@
                         .getInstants().collect(Collectors.toList());
         try {
             // Go over the commit metadata, and obtain the new files that need to be read.
-            HashMap<String, String> fileIdToFullPath = getLatestFileIsToFullPath(basePath, commitTimeline, commitsToReturn);
+            HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
             return sqlContext.read()
                     .parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]))
                     .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime));
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java
index cab3d6d..e421373 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java
@@ -16,6 +16,7 @@
 
 package com.uber.hoodie.common.util;
 
+import com.uber.hoodie.avro.MercifulJsonConverter;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.exception.HoodieIOException;
 import java.net.URI;
@@ -118,4 +119,11 @@
         return new Schema.Parser()
             .parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro"));
     }
+
+    public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
+                                                           String commitTime, String fileId) throws IOException {
+        TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
+        MercifulJsonConverter converter = new MercifulJsonConverter(schema);
+        return converter.convert(record.toJsonString());
+    }
 }
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
index d48bfa2..ae57a4f 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -20,6 +20,7 @@
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieTestUtils;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.SchemaTestUtil;
 import com.uber.hoodie.common.util.TestRecord;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -120,31 +121,11 @@
     private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException {
         List<GenericRecord> records = new ArrayList<>(numberOfRecords);
         for(int i=0;i<numberOfRecords;i++) {
-            records.add(generateAvroRecordFromJson(schema, i, commitTime, fileId));
+            records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, commitTime, fileId));
         }
         return records;
     }
 
-    public static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
-        String commitTime, String fileId) {
-        GenericRecord record = new GenericData.Record(schema);
-        record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime);
-        record.put("field1", "field" + recordNumber);
-        record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, "key_" + recordNumber);
-        record.put("field2", "field" + recordNumber);
-        record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, commitTime);
-        record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileId);
-        record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, commitTime + "_" + recordNumber);
-        return record;
-    }
-
-    public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
-        String commitTime, String fileId) throws IOException {
-        TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
-        MercifulJsonConverter converter = new MercifulJsonConverter(schema);
-        return converter.convert(record.toJsonString());
-    }
-
     public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
         int totalNumberOfRecords, int numberOfRecordsToUpdate,
         String newCommit) throws IOException {
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index 2968e0d..9908081 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -78,7 +78,7 @@
                 .overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build();
         List<IndexedRecord> records = new ArrayList<>();
         for(int i=0; i < numberOfRecords; i++) {
-            records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
+            records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
         }
         Schema writeSchema = records.get(0).getSchema();
         HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema);
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index b78da79..2f68c14b 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -59,7 +59,8 @@
             if (i == parts.length - 1) {
                 return val.toString();
             } else {
-                if (val instanceof GenericRecord) {
+                // VC: Need a test here
+                if (!(val instanceof GenericRecord)) {
                     throw new HoodieException("Cannot find a record at part value :" + part);
                 }
                 valueNode = (GenericRecord) val;
@@ -80,7 +81,7 @@
     }
 
     /**
-     * Create a payload class via reflection, passing in an ordering/precombine value value.
+     * Create a payload class via reflection, passing in an ordering/precombine value.
      */
     public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException {
         try {
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java
index 2d5a690..d43a81d 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java
@@ -49,7 +49,6 @@
      * Get a list of instant times that have occurred, from the given instant timestamp.
      *
      * @param instantTimestamp
-     * @return
      */
     public static List<String> listCommitsSince(FileSystem fs, String basePath, String instantTimestamp) {
         HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
@@ -71,7 +70,6 @@
      *
      * @param fs
      * @param basePath
-     * @return
      */
     public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) {
         HoodieTable table = HoodieTable
diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala
index cb799c9..c4e3307 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala
@@ -38,9 +38,9 @@
     * Default: READ_OPTIMIZED
     */
   val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
-  val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "READ_OPTIMIZED"
-  val VIEW_TYPE_INCREMENTAL_OPT_VAL = "INCREMENTAL"
-  val VIEW_TYPE_REALTIME_OPT_VAL = "REALTIME"
+  val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
+  val VIEW_TYPE_INCREMENTAL_OPT_VAL = "incremental"
+  val VIEW_TYPE_REALTIME_OPT_VAL = "realtime"
   val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL
 
 
@@ -82,6 +82,7 @@
 
   /**
     * The storage type for the underlying data, for this write.
+    * Note that this can't change across writes.
     *
     * Default: COPY_ON_WRITE
     */
diff --git a/hoodie-spark/src/test/java/HoodieJavaApp.java b/hoodie-spark/src/test/java/HoodieJavaApp.java
index bf79114..c61d8cc 100644
--- a/hoodie-spark/src/test/java/HoodieJavaApp.java
+++ b/hoodie-spark/src/test/java/HoodieJavaApp.java
@@ -66,9 +66,6 @@
         cli.run();
     }
 
-
-
-
     public void run() throws Exception {
 
         // Spark session setup..
@@ -103,6 +100,7 @@
                 .mode(SaveMode.Overwrite) // This will remove any existing data at path below, and create a new dataset if needed
                 .save(tablePath); // ultimately where the dataset will be placed
         String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+        logger.info("First commit at instant time :" + commitInstantTime1);
 
         /**
          * Commit that updates records
@@ -120,6 +118,7 @@
                 .mode(SaveMode.Append)
                 .save(tablePath);
         String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+        logger.info("Second commit at instant time :" + commitInstantTime1);
 
         /**
          * Read & do some queries
@@ -142,7 +141,7 @@
                 .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1) // Only changes in write 2 above
                 .load(tablePath); // For incremental view, pass in the root/base path of dataset
 
-        System.out.println("You will only see records from : " + commitInstantTime2);
+        logger.info("You will only see records from : " + commitInstantTime2);
         hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
     }
 }
diff --git a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala
new file mode 100644
index 0000000..2996c46
--- /dev/null
+++ b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala
@@ -0,0 +1,110 @@
+/*
+ *  Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+import com.uber.hoodie.{DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator}
+import com.uber.hoodie.common.util.SchemaTestUtil
+import com.uber.hoodie.exception.HoodieException
+import org.apache.avro.generic.GenericRecord
+import org.apache.commons.configuration.PropertiesConfiguration
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.scalatest.junit.AssertionsForJUnit
+
+/**
+  * Tests on the default key generator, payload classes.
+  */
+class DataSourceDefaultsTest extends AssertionsForJUnit {
+
+  val schema = SchemaTestUtil.getComplexEvolvedSchema
+  var baseRecord : GenericRecord = null
+
+  @Before def initialize(): Unit = {
+    baseRecord = SchemaTestUtil
+      .generateAvroRecordFromJson(schema, 1, "001", "f1")
+  }
+
+
+  private def getKeyConfig(recordKeyFieldName: String, paritionPathField: String): PropertiesConfiguration  = {
+    val props = new PropertiesConfiguration()
+    props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName)
+    props.addProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, paritionPathField)
+    props
+  }
+  @Test def testSimpleKeyGenerator() = {
+    // top level, valid fields
+    val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name")).getKey(baseRecord)
+    assertEquals("field1", hk1.getRecordKey)
+    assertEquals("name1", hk1.getPartitionPath)
+
+    // recordKey field not specified
+    try {
+      val props = new PropertiesConfiguration()
+      props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1")
+      new SimpleKeyGenerator(props).getKey(baseRecord)
+      fail("Should have errored out")
+    } catch {
+      case e: HoodieException => {
+        // do nothing
+      }
+    };
+
+    // partitionPath field is null
+    try {
+      new SimpleKeyGenerator(getKeyConfig("field1", null)).getKey(baseRecord)
+      fail("Should have errored out")
+    } catch {
+      case e: HoodieException => {
+        // do nothing
+      }
+    };
+
+    // nested field as record key and partition path
+    val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin"))
+      .getKey(baseRecord)
+    assertEquals("UserId1@001", hk2.getRecordKey)
+    assertEquals("false", hk2.getPartitionPath)
+
+    // Nested record key not found
+    try {
+      new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin"))
+        .getKey(baseRecord)
+      fail("Should have errored out")
+    } catch {
+      case e: HoodieException => {
+        // do nothing
+      }
+    };
+  }
+
+  @Test def testOverwriteWithLatestAvroPayload() = {
+    val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1)
+    val laterRecord = SchemaTestUtil
+      .generateAvroRecordFromJson(schema, 2, "001", "f1")
+    val overWritePayload2 = new OverwriteWithLatestAvroPayload(laterRecord, 2)
+
+    // it will provide the record with greatest combine value
+    val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2)
+    val combinedGR12 = combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord]
+    assertEquals("field2", combinedGR12.get("field1"))
+
+    // and it will be deterministic, to order of processing.
+    val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1)
+    val combinedGR21 = combinedPayload21.getInsertValue(schema).get().asInstanceOf[GenericRecord]
+    assertEquals("field2", combinedGR21.get("field1"))
+  }
+}
diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala
index 35e2b3b..764206f 100644
--- a/hoodie-spark/src/test/scala/DataSourceTest.scala
+++ b/hoodie-spark/src/test/scala/DataSourceTest.scala
@@ -21,7 +21,7 @@
 import com.uber.hoodie.config.HoodieWriteConfig
 import com.uber.hoodie.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql._
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.junit.rules.TemporaryFolder
@@ -127,9 +127,9 @@
     try {
       val hoodieROViewDF1 = spark.read.format("com.uber.hoodie")
         .load(basePath + "/*/*/*/*")
-      fail() // we would error out, since no compaction has yet occurred.
+      fail("we should error out, since no compaction has yet occurred.")
     } catch {
-      case e: Exception => {
+      case e: AnalysisException => {
         // do nothing
       }
     };