[fix](connector) fix npe caused by prune columns  (#273)

diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
index cec9890..55dbb2c 100644
--- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
+++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala
@@ -18,18 +18,35 @@
 package org.apache.doris.spark.read
 
 import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
-import org.apache.doris.spark.util.DorisDialects
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.types.StructType
 
 protected[spark] abstract class DorisScanBuilderBase(config: DorisConfig, schema: StructType) extends ScanBuilder
   with SupportsPushDownRequiredColumns {
 
-  protected var readSchema: StructType = schema
+  protected var readSchema: StructType = {
+    if (config.contains(DorisOptions.DORIS_READ_FIELDS)) {
+      val dorisReadFields = config.getValue(DorisOptions.DORIS_READ_FIELDS).split(",").map(_.trim.replaceAll("`", ""))
+      doPruneColumns(schema, dorisReadFields)
+    } else {
+      schema
+    }
+  }
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-    readSchema = StructType(requiredSchema.fields.filter(schema.contains(_)))
+    doPruneColumns(readSchema, requiredSchema.fieldNames)
+  }
+
+  private def doPruneColumns(originSchema: StructType, requiredCols: Array[String]): StructType = {
+    if (requiredCols.nonEmpty) {
+      val fields = originSchema.fields.filter(
+        field => requiredCols.contains(field.name)
+      )
+      if (fields.isEmpty) {
+        throw new IllegalArgumentException("No required columns found")
+      }
+      StructType(fields)
+    } else originSchema
   }
 
 }