[HUDI-7147] Fix npe stream sync first batch, empty schema, upsert (#10689)
* fix npe
* add empty table support as well
* use empty relation
* fix failing tests
---------
Co-authored-by: Jonathan Vexler <=>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
new file mode 100644
index 0000000..12d1498
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+
+public class HoodieSchemaNotFoundException extends HoodieSchemaException {
+ public HoodieSchemaNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index b71073a..a8f46c4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.HoodieSchemaNotFoundException;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -588,6 +589,6 @@
}
private Supplier<Exception> schemaNotFoundError() {
- return () -> new IllegalArgumentException("No schema found for table at " + metaClient.getBasePathV2().toString());
+ return () -> new HoodieSchemaNotFoundException("No schema found for table at " + metaClient.getBasePathV2().toString());
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
index 601718e..6997756 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
@@ -83,7 +83,9 @@
* @return an avro Schema where null is the first.
*/
public static Schema fixNullOrdering(Schema schema) {
- if (schema.getType() == Schema.Type.NULL) {
+ if (schema == null) {
+ return Schema.create(Schema.Type.NULL);
+ } else if (schema.getType() == Schema.Type.NULL) {
return schema;
}
return convert(convert(schema), schema.getFullName());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 4f4ae20..c346f76 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,6 +21,7 @@
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.HoodieSchemaNotFoundException
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
@@ -33,14 +34,13 @@
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.util.PathUtils
-
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, HoodieStreamSource}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions.mapAsJavaMap
@@ -73,7 +73,12 @@
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
- createRelation(sqlContext, parameters, null)
+ try {
+ createRelation(sqlContext, parameters, null)
+ } catch {
+ case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, new StructType())
+ case e => throw e
+ }
}
override def createRelation(sqlContext: SQLContext,
@@ -373,7 +378,9 @@
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
} catch {
case _: Exception =>
- require(schema.isDefined, "Fail to resolve source schema")
+ if (schema.isEmpty || schema.get == null) {
+ throw new HoodieSchemaNotFoundException("Failed to resolve source schema")
+ }
schema.get
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index c69ca48..9a870e6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -176,9 +176,7 @@
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
- case Failure(e) =>
- logError("Failed to fetch schema from the table", e)
- throw new HoodieSchemaException("Failed to fetch schema from the table")
+ case Failure(e) => throw e
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 34088f2..e8ca19e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -101,8 +101,7 @@
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
- case Failure(e) =>
- throw new HoodieSchemaException("Failed to fetch schema from the table")
+ case Failure(e) => throw e
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 39d093b..cb0209d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,7 +18,7 @@
package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENERATOR_CLASS_NAME}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
@@ -1855,6 +1855,34 @@
})
assertEquals(3, clusterInstants.size)
}
+
+
+ @Test
+ def testReadOfAnEmptyTable(): Unit = {
+ val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val fileStatuses = fs.listStatus(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter {
+ override def accept(path: Path): Boolean = {
+ path.getName.endsWith(HoodieTimeline.COMMIT_ACTION)
+ }
+ })
+
+ // delete completed instant
+ fs.delete(fileStatuses.toList.get(0).getPath)
+ // try reading the empty table
+ val count = spark.read.format("hudi").load(basePath).count()
+ assertEquals(count, 0)
+ }
+
}
object TestCOWDataSource {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 4bfbee6..5294ae1 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -2112,6 +2112,36 @@
}
@Test
+ public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 10;
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, null, "timestamp", null);
+
+ config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
+ config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
+ HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
+ deltaStreamer1.sync();
+ deltaStreamer1.shutdownGracefully();
+ assertRecordCount(0, tableBasePath, sqlContext);
+
+ config.schemaProviderClassName = null;
+ config.sourceClassName = ParquetDFSSource.class.getName();
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
+ HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(config, jsc);
+ deltaStreamer2.sync();
+ deltaStreamer2.shutdownGracefully();
+ //since first batch has empty schema, only records from the second batch should be written
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ }
+
+ @Test
public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
testDeltaStreamerRestartAfterMissingHoodieProps(true);
}