[HUDI-7147] Fix npe stream sync first batch, empty schema, upsert (#10689)

* fix npe

* add empty table support as well

* use empty relation

* fix failing tests

---------

Co-authored-by: Jonathan Vexler <=>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
new file mode 100644
index 0000000..12d1498
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+
+public class HoodieSchemaNotFoundException extends HoodieSchemaException {
+  public HoodieSchemaNotFoundException(String message) {
+    super(message);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index b71073a..a8f46c4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.HoodieSchemaNotFoundException;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -588,6 +589,6 @@
   }
 
   private Supplier<Exception> schemaNotFoundError() {
-    return () -> new IllegalArgumentException("No schema found for table at " + metaClient.getBasePathV2().toString());
+    return () -> new HoodieSchemaNotFoundException("No schema found for table at " + metaClient.getBasePathV2().toString());
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
index 601718e..6997756 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
@@ -83,7 +83,9 @@
    * @return an avro Schema where null is the first.
    */
   public static Schema fixNullOrdering(Schema schema) {
-    if (schema.getType() == Schema.Type.NULL) {
+    if (schema == null) {
+      return Schema.create(Schema.Type.NULL);
+    } else if (schema.getType() == Schema.Type.NULL) {
       return schema;
     }
     return convert(convert(schema), schema.getFullName());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 4f4ae20..c346f76 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,6 +21,7 @@
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.HoodieSchemaNotFoundException
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
@@ -33,14 +34,13 @@
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.util.PathUtils
-
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, HoodieStreamSource}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions.mapAsJavaMap
@@ -73,7 +73,12 @@
 
   override def createRelation(sqlContext: SQLContext,
                               parameters: Map[String, String]): BaseRelation = {
-    createRelation(sqlContext, parameters, null)
+    try {
+      createRelation(sqlContext, parameters, null)
+    } catch {
+      case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, new StructType())
+      case e => throw e
+    }
   }
 
   override def createRelation(sqlContext: SQLContext,
@@ -373,7 +378,9 @@
         AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
       } catch {
         case _: Exception =>
-          require(schema.isDefined, "Fail to resolve source schema")
+          if (schema.isEmpty || schema.get == null) {
+            throw new HoodieSchemaNotFoundException("Failed to resolve source schema")
+          }
           schema.get
       }
     }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index c69ca48..9a870e6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -176,9 +176,7 @@
     } getOrElse {
       Try(schemaResolver.getTableAvroSchema) match {
         case Success(schema) => schema
-        case Failure(e) =>
-          logError("Failed to fetch schema from the table", e)
-          throw new HoodieSchemaException("Failed to fetch schema from the table")
+        case Failure(e) => throw e
       }
     }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 34088f2..e8ca19e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -101,8 +101,7 @@
     } getOrElse {
       Try(schemaResolver.getTableAvroSchema) match {
         case Success(schema) => schema
-        case Failure(e) =>
-          throw new HoodieSchemaException("Failed to fetch schema from the table")
+        case Failure(e) => throw e
       }
     }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 39d093b..cb0209d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,7 +18,7 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENERATOR_CLASS_NAME}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
@@ -1855,6 +1855,34 @@
     })
     assertEquals(3, clusterInstants.size)
   }
+
+
+  @Test
+  def testReadOfAnEmptyTable(): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val fileStatuses = fs.listStatus(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter {
+      override def accept(path: Path): Boolean = {
+        path.getName.endsWith(HoodieTimeline.COMMIT_ACTION)
+      }
+    })
+
+    // delete completed instant
+    fs.delete(fileStatuses.toList.get(0).getPath)
+    // try reading the empty table
+    val count = spark.read.format("hudi").load(basePath).count()
+    assertEquals(count, 0)
+  }
+
 }
 
 object TestCOWDataSource {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 4bfbee6..5294ae1 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -2112,6 +2112,36 @@
   }
 
   @Test
+  public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 10;
+    prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
+        null, PROPS_FILENAME_TEST_PARQUET, false,
+        false, 100000, false, null, null, "timestamp", null);
+
+    config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
+    config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
+    HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer1.sync();
+    deltaStreamer1.shutdownGracefully();
+    assertRecordCount(0, tableBasePath, sqlContext);
+
+    config.schemaProviderClassName = null;
+    config.sourceClassName = ParquetDFSSource.class.getName();
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
+    HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer2.sync();
+    deltaStreamer2.shutdownGracefully();
+    //since first batch has empty schema, only records from the second batch should be written
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+  }
+
+  @Test
   public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
     testDeltaStreamerRestartAfterMissingHoodieProps(true);
   }