[feature] add stream load config to add double quotes for field when csv format. (#119)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 9ecfa40..0b506b0 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -70,6 +70,7 @@
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
+ private static final String NULL_VALUE = "\\N";
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -88,6 +89,7 @@
private final String columns;
private final String maxFilterRatio;
private final Map<String, String> streamLoadProp;
+ private boolean addDoubleQuotes;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fileType;
@@ -111,6 +113,11 @@
fileType = streamLoadProp.getOrDefault("format", "csv");
if ("csv".equals(fileType)) {
FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
+ this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
+ if (addDoubleQuotes) {
+ LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.");
+ streamLoadProp.put("trim_double_quotes", "true");
+ }
} else if ("json".equalsIgnoreCase(fileType)) {
streamLoadProp.put("read_json_by_line", "true");
}
@@ -189,7 +196,8 @@
.format(fileType)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
- .schema(schema).build(), streamingPassthrough);
+ .schema(schema)
+ .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
index 779c057..4ce297f 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java
@@ -61,14 +61,17 @@
*/
private final StructType schema;
+ private final boolean addDoubleQuotes;
+
private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
- StructType schema) {
+ StructType schema, boolean addDoubleQuotes) {
this.iterator = iterator;
this.batchSize = batchSize;
this.format = format;
this.sep = sep;
this.delim = delim;
this.schema = schema;
+ this.addDoubleQuotes = addDoubleQuotes;
}
public Iterator<InternalRow> getIterator() {
@@ -94,6 +97,10 @@
public StructType getSchema() {
return schema;
}
+
+ public boolean getAddDoubleQuotes(){
+ return addDoubleQuotes;
+ }
public static Builder newBuilder(Iterator<InternalRow> iterator) {
return new Builder(iterator);
}
@@ -115,6 +122,8 @@
private StructType schema;
+ private boolean addDoubleQuotes;
+
public Builder(Iterator<InternalRow> iterator) {
this.iterator = iterator;
}
@@ -144,8 +153,13 @@
return this;
}
+ public Builder addDoubleQuotes(boolean addDoubleQuotes) {
+ this.addDoubleQuotes = addDoubleQuotes;
+ return this;
+ }
+
public RecordBatch build() {
- return new RecordBatch(iterator, batchSize, format, sep, delim, schema);
+ return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes);
}
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
index 9444c1d..d705501 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
@@ -200,7 +200,11 @@
switch (recordBatch.getFormat().toLowerCase()) {
case "csv":
- bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
+ if (recordBatch.getAddDoubleQuotes()) {
+ bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
+ } else {
+ bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
+ }
break;
case "json":
try {
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index aea6dde..3f53d45 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -51,6 +51,22 @@
return builder.toString().getBytes(StandardCharsets.UTF_8);
}
+ public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) {
+ StringBuilder builder = new StringBuilder();
+ StructField[] fields = schema.fields();
+ int n = row.numFields();
+ if (n > 0) {
+ builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\"");
+ int i = 1;
+ while (i < n) {
+ builder.append(sep);
+ builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\"");
+ i++;
+ }
+ }
+ return builder.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
throws JsonProcessingException {
StructField[] fields = schema.fields();