[fix](connector) fix npe caused by prune columns (#273)
diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
index cec9890..55dbb2c 100644
--- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
+++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
@@ -18,18 +18,35 @@
package org.apache.doris.spark.read
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
-import org.apache.doris.spark.util.DorisDialects
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.types.StructType
protected[spark] abstract class DorisScanBuilderBase(config: DorisConfig, schema: StructType) extends ScanBuilder
with SupportsPushDownRequiredColumns {
- protected var readSchema: StructType = schema
+ protected var readSchema: StructType = {
+ if (config.contains(DorisOptions.DORIS_READ_FIELDS)) {
+ val dorisReadFields = config.getValue(DorisOptions.DORIS_READ_FIELDS).split(",").map(_.trim.replaceAll("`", ""))
+ doPruneColumns(schema, dorisReadFields)
+ } else {
+ schema
+ }
+ }
override def pruneColumns(requiredSchema: StructType): Unit = {
- readSchema = StructType(requiredSchema.fields.filter(schema.contains(_)))
+ doPruneColumns(readSchema, requiredSchema.fieldNames)
+ }
+
+ private def doPruneColumns(originSchema: StructType, requiredCols: Array[String]): StructType = {
+ if (requiredCols.nonEmpty) {
+ val fields = originSchema.fields.filter(
+ field => requiredCols.contains(field.name)
+ )
+ if (fields.isEmpty) {
+ throw new IllegalArgumentException("No required columns found")
+ }
+ StructType(fields)
+ } else originSchema
}
}