[Improve]Improve import speed by compressing data at low internet speeds (#180)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 64cb38f..7c13310 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -127,6 +127,9 @@
     String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
     boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false;
 
-
+    /**
+     * compress_type
+     */
+    String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type";
 
 }
\ No newline at end of file
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 9fdf4c8..08bc29d 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -39,6 +39,7 @@
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultRedirectStrategy;
@@ -50,6 +51,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -70,7 +72,7 @@
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
+import java.util.zip.GZIPOutputStream;
 
 /**
  * DorisStreamLoad
@@ -107,6 +109,7 @@
     private final Integer txnRetries;
     private final Integer txnIntervalMs;
     private final boolean autoRedirect;
+    private final String compressType;
 
     public DorisStreamLoad(SparkSettings settings) {
         String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -147,6 +150,7 @@
 
         this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
                 ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT);
+        compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE);
     }
 
     public String getLoadUrlStr() {
@@ -222,13 +226,30 @@
             String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
             this.loadUrlStr = loadUrlStr;
             HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
-            RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
-                    .format(dataFormat)
-                    .sep(FIELD_DELIMITER)
-                    .delim(LINE_DELIMITER)
-                    .schema(schema)
-                    .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
-            httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
+
+            if(StringUtils.isNotEmpty(compressType)){
+                if("gz".equals(compressType.toLowerCase()) && dataFormat.equals(DataFormat.CSV) ){
+                    RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
+                            .format(dataFormat)
+                            .sep(FIELD_DELIMITER)
+                            .delim(LINE_DELIMITER)
+                            .schema(schema)
+                            .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
+                    String content = recordBatchString.getContent();
+                    byte[] compressedData = compressByGZ(content);
+                    httpPut.setEntity(new ByteArrayEntity(compressedData));
+                }else{
+                    throw new StreamLoadException("Not support the compress type [" + compressType + "] for the dataformat [" + dataFormat + "]");
+                }
+            }else{
+                RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
+                        .format(dataFormat)
+                        .sep(FIELD_DELIMITER)
+                        .delim(LINE_DELIMITER)
+                        .schema(schema)
+                        .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
+                httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
+            }
             HttpResponse httpResponse = httpClient.execute(httpPut);
             loadResponse = new LoadResponse(httpResponse);
         } catch (IOException e) {
@@ -447,7 +468,7 @@
 
         @Override
         public List<BackendV2.BackendRowV2> load(String key) throws Exception {
-                return RestService.getBackendRows(settings, LOG);
+            return RestService.getBackendRows(settings, LOG);
         }
     }
 
@@ -505,4 +526,19 @@
 
     }
 
+    /**
+     * compress data by gz compression algorithm
+     */
+    public byte[] compressByGZ(String content) throws IOException{
+        byte[] compressedData;
+        try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
+        ){
+            gzipOutputStream.write(content.getBytes("UTF-8"));
+            gzipOutputStream.finish();
+            compressedData = baos.toByteArray();
+        }
+        return compressedData;
+    }
+
 }
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java
new file mode 100644
index 0000000..c8024ee
--- /dev/null
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.exception.IllegalArgumentException;
+import org.apache.doris.spark.util.DataUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * InputStream for batch load
+ */
+public class RecordBatchString {
+
+    public static final Logger LOG = LoggerFactory.getLogger(RecordBatchString.class);
+
+    /**
+     * Load record batch
+     */
+    private final RecordBatch recordBatch;
+
+    private final byte[] delim;
+
+    /**
+     * record count has been read
+     */
+    private int readCount = 0;
+
+    /**
+     * streaming mode pass through data without process
+     */
+    private final boolean passThrough;
+
+    public RecordBatchString(RecordBatch recordBatch, boolean passThrough) {
+        this.recordBatch = recordBatch;
+        this.passThrough = passThrough;
+        this.delim = recordBatch.getDelim();
+    }
+
+    public String getContent() throws IOException {
+        String delimStr = new String(this.recordBatch.getDelim());
+        StringBuilder builder = new StringBuilder();
+        Iterator<InternalRow> iterator = recordBatch.getIterator();
+        while (iterator.hasNext()) {
+            try {
+                builder.append(rowToString(iterator.next()));
+            } catch (DorisException e) {
+                throw new IOException(e);
+            }
+            builder.append(delimStr);
+        }
+        return builder.toString().substring(0, builder.length()-delimStr.length());
+    }
+
+
+    /**
+     * Convert Spark row data to string
+     *
+     * @param row row data
+     * @return byte array
+     * @throws DorisException
+     */
+    private String rowToString(InternalRow row) throws DorisException {
+
+        String str;
+
+        if (passThrough) {
+            str = row.getString(0);
+            return str;
+        }
+
+        switch (recordBatch.getFormat()) {
+            case CSV:
+                str = DataUtil.rowToCsvString(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes());
+                break;
+            case JSON:
+                try {
+                    str = DataUtil.rowToJsonString(row, recordBatch.getSchema());
+                } catch (JsonProcessingException e) {
+                    throw new DorisException("parse row to json bytes failed", e);
+                }
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported format: ", recordBatch.getFormat().toString());
+        }
+
+        return str;
+
+    }
+}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index f7218c3..5c4527e 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -40,7 +40,7 @@
 
     public static final String NULL_VALUE = "\\N";
 
-    public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
+    public static String rowToCsvString(InternalRow row, StructType schema, String sep, boolean quote) {
         StructField[] fields = schema.fields();
         int n = row.numFields();
         if (n > 0) {
@@ -51,9 +51,13 @@
                     value = "\"" + value + "\"";
                 }
                 return value.toString();
-            }).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8);
+            }).collect(Collectors.joining(sep));
         }
-        return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8);
+        return StringUtils.EMPTY;
+    }
+
+    public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
+        return rowToCsvString(row, schema, sep, quote).getBytes(StandardCharsets.UTF_8);
     }
 
     public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException {
@@ -65,4 +69,13 @@
         return MAPPER.writeValueAsBytes(rowMap);
     }
 
+    public static String rowToJsonString(InternalRow row, StructType schema) throws JsonProcessingException {
+        StructField[] fields = schema.fields();
+        Map<String, Object> rowMap = new HashMap<>(row.numFields());
+        for (int i = 0; i < fields.length; i++) {
+            rowMap.put(fields[i].name(), SchemaUtils.rowColumnValue(row, i, fields[i].dataType()));
+        }
+        return MAPPER.writeValueAsString(rowMap);
+    }
+
 }
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java
new file mode 100644
index 0000000..7fdd88f
--- /dev/null
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load;
+
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.spark.SparkConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestDorisStreamLoad {
+
+    @Test
+    public void compressByGZ() throws IOException {
+        String content = "1,aa,1\n" +
+                "2,aa,2\n" +
+                "3,aa,3\n" +
+                "4,aa,4\n" +
+                "5,aa,5\n" +
+                "6,aa,6\n" +
+                "7,aa,7\n" +
+                "8,aa,8\n" +
+                "9,aa,9\n" +
+                "10,aa,10\n" +
+                "11,aa,11\n" +
+                "12,aa,12\n" +
+                "13,aa,13\n" +
+                "14,aa,14\n" +
+                "15,aa,15\n" +
+                "16,aa,16\n" +
+                "17,aa,17\n" +
+                "18,aa,18\n" +
+                "19,aa,19\n" +
+                "20,aa,20\n" +
+                "21,aa,21\n" +
+                "22,aa,22\n" +
+                "23,aa,23\n" +
+                "24,aa,24\n" +
+                "25,aa,25\n" +
+                "26,aa,26\n" +
+                "27,aa,27\n" +
+                "28,aa,28\n" +
+                "29,aa,29\n" +
+                "30,aa,30\n" +
+                "31,aa,31\n" +
+                "32,aa,32\n" +
+                "33,aa,33\n" +
+                "34,aa,34\n" +
+                "35,aa,35\n" +
+                "36,aa,36\n" +
+                "37,aa,37\n" +
+                "38,aa,38\n" +
+                "39,aa,39";
+        byte[] compressByte = new DorisStreamLoad(new SparkSettings(new SparkConf().set("doris.table.identifier", "aa.bb"))).compressByGZ(content);
+
+        int contentByteLength = content.getBytes("utf-8").length;
+        int compressByteLength = compressByte.length;
+        System.out.println(contentByteLength);
+        System.out.println(compressByteLength);
+        Assert.assertTrue(contentByteLength > compressByteLength);
+
+        java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
+        java.io.ByteArrayInputStream in = new java.io.ByteArrayInputStream(compressByte);
+        java.util.zip.GZIPInputStream ungzip = new java.util.zip.GZIPInputStream(in);
+        byte[] buffer = new byte[1024];
+        int n;
+        while ((n = ungzip.read(buffer)) >= 0) out.write(buffer, 0, n);
+        byte[] unGzipByte = out.toByteArray();
+
+        String unGzipStr = new String(unGzipByte);
+        Assert.assertArrayEquals(unGzipStr.getBytes("utf-8"), content.getBytes("utf-8"));
+        Assert.assertEquals(unGzipStr, content);
+    }
+}