[feature] add stream load config to add double quotes for field when csv format. (#119)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 9ecfa40..0b506b0 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -70,6 +70,7 @@
  * DorisStreamLoad
  **/
 public class DorisStreamLoad implements Serializable {
+    private static final String NULL_VALUE = "\\N";
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
 
@@ -88,6 +89,7 @@
     private final String columns;
     private final String maxFilterRatio;
     private final Map<String, String> streamLoadProp;
+    private boolean addDoubleQuotes;
     private static final long cacheExpireTimeout = 4 * 60;
     private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
     private final String fileType;
@@ -111,6 +113,11 @@
         fileType = streamLoadProp.getOrDefault("format", "csv");
         if ("csv".equals(fileType)) {
             FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
+            this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
+            if (addDoubleQuotes) {
+                LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.");
+                streamLoadProp.put("trim_double_quotes", "true");
+            }
         } else if ("json".equalsIgnoreCase(fileType)) {
             streamLoadProp.put("read_json_by_line", "true");
         }
@@ -189,7 +196,8 @@
                     .format(fileType)
                     .sep(FIELD_DELIMITER)
                     .delim(LINE_DELIMITER)
-                    .schema(schema).build(), streamingPassthrough);
+                    .schema(schema)
+                    .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
             httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
             HttpResponse httpResponse = httpClient.execute(httpPut);
             loadResponse = new LoadResponse(httpResponse);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
index 779c057..4ce297f 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
@@ -61,14 +61,17 @@
      */
     private final StructType schema;
 
+    private final boolean addDoubleQuotes;
+
     private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
-                        StructType schema) {
+                        StructType schema, boolean addDoubleQuotes) {
         this.iterator = iterator;
         this.batchSize = batchSize;
         this.format = format;
         this.sep = sep;
         this.delim = delim;
         this.schema = schema;
+        this.addDoubleQuotes = addDoubleQuotes;
     }
 
     public Iterator<InternalRow> getIterator() {
@@ -94,6 +97,10 @@
     public StructType getSchema() {
         return schema;
     }
+
+    public boolean getAddDoubleQuotes(){
+        return addDoubleQuotes;
+    }
     public static Builder newBuilder(Iterator<InternalRow> iterator) {
         return new Builder(iterator);
     }
@@ -115,6 +122,8 @@
 
         private StructType schema;
 
+        private boolean addDoubleQuotes;
+
         public Builder(Iterator<InternalRow> iterator) {
             this.iterator = iterator;
         }
@@ -144,8 +153,13 @@
             return this;
         }
 
+        public Builder addDoubleQuotes(boolean addDoubleQuotes) {
+            this.addDoubleQuotes = addDoubleQuotes;
+            return this;
+        }
+
         public RecordBatch build() {
-            return new RecordBatch(iterator, batchSize, format, sep, delim, schema);
+            return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes);
         }
 
     }
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
index 9444c1d..d705501 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
@@ -200,7 +200,11 @@
 
         switch (recordBatch.getFormat().toLowerCase()) {
             case "csv":
-                bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
+                if (recordBatch.getAddDoubleQuotes()) {
+                    bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
+                } else {
+                    bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
+                }
                 break;
             case "json":
                 try {
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index aea6dde..3f53d45 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -51,6 +51,22 @@
         return builder.toString().getBytes(StandardCharsets.UTF_8);
     }
 
+    public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) {
+        StringBuilder builder = new StringBuilder();
+        StructField[] fields = schema.fields();
+        int n = row.numFields();
+        if (n > 0) {
+            builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\"");
+            int i = 1;
+            while (i < n) {
+                builder.append(sep);
+                builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\"");
+                i++;
+            }
+        }
+        return builder.toString().getBytes(StandardCharsets.UTF_8);
+    }
+
     public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
             throws JsonProcessingException {
         StructField[] fields = schema.fields();