[HUDI-1208] Ordering Field should be optional when precombine is turned off (#2088)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index 845967c..6d32d50 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -44,7 +44,7 @@
   }
 
   public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
-    this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
+    this(record.isPresent() ? record.get() : null, 0); // natural order
   }
 
   @Override
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index a4d4715..0b16c31 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -142,6 +142,19 @@
     }
   }
 
+  /**
+   * Create a payload class via reflection, do not ordering/precombine value.
+   */
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
+      throws IOException {
+    try {
+      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+          new Class<?>[] {Option.class}, Option.of(record));
+    } catch (Throwable e) {
+      throw new IOException("Could not create payload for class: " + payloadClass, e);
+    }
+  }
+
   public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
     checkPropNames.forEach(prop -> {
       if (!props.containsKey(prop)) {
@@ -214,6 +227,12 @@
     return new HoodieRecord<>(hKey, payload);
   }
 
+  public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey,
+                                                String payloadClass) throws IOException {
+    HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr);
+    return new HoodieRecord<>(hKey, payload);
+  }
+
   /**
    * Drop records already present in the dataset.
    *
diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
index 73711c70..d0e1326 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
@@ -50,7 +50,7 @@
   }
 
   public AWSDmsAvroPayload(Option<GenericRecord> record) {
-    this(record.get(), (record1) -> 0); // natural order
+    this(record.get(), 0); // natural order
   }
 
   /**
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7173a28..b9731df 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -141,12 +141,18 @@
           // Convert to RDD[HoodieRecord]
           val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
           val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+          val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT);
           val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
+            val hoodieRecord = if (shouldCombine) {
+              val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
+                .asInstanceOf[Comparable[_]]
+              DataSourceUtils.createHoodieRecord(gr,
+                orderingVal, keyGenerator.getKey(gr),
+                parameters(PAYLOAD_CLASS_OPT_KEY))
+            } else {
+              DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
+            }
+            hoodieRecord
           }).toJavaRDD()
 
           // Create a HoodieWriteClient & issue the write.
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 15872dd..e165904 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -124,7 +124,6 @@
         HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
         "hoodie.bulkinsert.shuffle.parallelism" -> "4",
         DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
-        DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
         DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
@@ -163,6 +162,63 @@
     }
   }
 
+  test("test insert dataset without precombine field") {
+    val session = SparkSession.builder()
+      .appName("test_insert_without_precombine")
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+    try {
+
+      val sqlContext = session.sqlContext
+      val sc = session.sparkContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        "hoodie.bulkinsert.shuffle.parallelism" -> "1",
+        DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+        DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY -> "false",
+        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
+      val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+      // generate the inserts
+      val schema = DataSourceTestUtils.getStructTypeExampleSchema
+      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+      val records = DataSourceTestUtils.generateRandomRows(100)
+      val recordsSeq = convertRowListToSeq(records)
+      val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
+      // write to Hudi
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df)
+
+      // collect all parition paths to issue read of parquet files
+      val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
+        HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
+      // Check the entire dataset has all records still
+      val fullPartitionPaths = new Array[String](3)
+      for (i <- 0 until fullPartitionPaths.length) {
+        fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i))
+      }
+
+      // fetch all records from parquet files generated from write to hudi
+      val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
+
+      // remove metadata columns so that expected and actual DFs can be compared as is
+      val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+        .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+        .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+      assert(df.except(trimmedDf).count() == 0)
+    } finally {
+      session.stop()
+      FileUtils.deleteDirectory(path.toFile)
+    }
+  }
+
   test("test bulk insert dataset with datasource impl multiple rounds") {
     initSparkContext("test_bulk_insert_datasource")
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a8d2ac10..36f1213 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -351,10 +351,12 @@
       return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
     }
 
+    boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(HoodieDeltaStreamer.Operation.UPSERT);
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
     JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
-      HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
+      HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+          (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false))
+          : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
       return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
     });