[Improve]Improve import speed by compressing data at low internet speeds (#180)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 64cb38f..7c13310 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -127,6 +127,9 @@
String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false;
-
+ /**
+ * compress_type
+ */
+ String DORIS_SINK_DATA_COMPRESS_TYPE = "doris.sink.properties.compress_type";
}
\ No newline at end of file
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 9fdf4c8..08bc29d 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -39,6 +39,7 @@
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
@@ -50,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
@@ -70,7 +72,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-
+import java.util.zip.GZIPOutputStream;
/**
* DorisStreamLoad
@@ -107,6 +109,7 @@
private final Integer txnRetries;
private final Integer txnIntervalMs;
private final boolean autoRedirect;
+ private final String compressType;
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -147,6 +150,7 @@
this.autoRedirect = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT);
+ compressType=settings.getProperty(ConfigurationOptions.DORIS_SINK_DATA_COMPRESS_TYPE);
}
public String getLoadUrlStr() {
@@ -222,13 +226,30 @@
String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
- RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
- .format(dataFormat)
- .sep(FIELD_DELIMITER)
- .delim(LINE_DELIMITER)
- .schema(schema)
- .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
- httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
+
+ if(StringUtils.isNotEmpty(compressType)){
+ if("gz".equals(compressType.toLowerCase()) && dataFormat.equals(DataFormat.CSV) ){
+ RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
+ .format(dataFormat)
+ .sep(FIELD_DELIMITER)
+ .delim(LINE_DELIMITER)
+ .schema(schema)
+ .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
+ String content = recordBatchString.getContent();
+ byte[] compressedData = compressByGZ(content);
+ httpPut.setEntity(new ByteArrayEntity(compressedData));
+ }else{
+ throw new StreamLoadException("Not support the compress type [" + compressType + "] for the dataformat [" + dataFormat + "]");
+ }
+ }else{
+ RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
+ .format(dataFormat)
+ .sep(FIELD_DELIMITER)
+ .delim(LINE_DELIMITER)
+ .schema(schema)
+ .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
+ httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
+ }
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
} catch (IOException e) {
@@ -447,7 +468,7 @@
@Override
public List<BackendV2.BackendRowV2> load(String key) throws Exception {
- return RestService.getBackendRows(settings, LOG);
+ return RestService.getBackendRows(settings, LOG);
}
}
@@ -505,4 +526,19 @@
}
+ /**
+ * compress data by gz compression algorithm
+ */
+ public byte[] compressByGZ(String content) throws IOException{
+ byte[] compressedData;
+ try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
+ ){
+ gzipOutputStream.write(content.getBytes("UTF-8"));
+ gzipOutputStream.finish();
+ compressedData = baos.toByteArray();
+ }
+ return compressedData;
+ }
+
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java
new file mode 100644
index 0000000..c8024ee
--- /dev/null
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchString.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.exception.IllegalArgumentException;
+import org.apache.doris.spark.util.DataUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * InputStream for batch load
+ */
+public class RecordBatchString {
+
+ public static final Logger LOG = LoggerFactory.getLogger(RecordBatchString.class);
+
+ /**
+ * Load record batch
+ */
+ private final RecordBatch recordBatch;
+
+ private final byte[] delim;
+
+ /**
+ * record count has been read
+ */
+ private int readCount = 0;
+
+ /**
+ * streaming mode pass through data without process
+ */
+ private final boolean passThrough;
+
+ public RecordBatchString(RecordBatch recordBatch, boolean passThrough) {
+ this.recordBatch = recordBatch;
+ this.passThrough = passThrough;
+ this.delim = recordBatch.getDelim();
+ }
+
+ public String getContent() throws IOException {
+ String delimStr = new String(this.recordBatch.getDelim());
+ StringBuilder builder = new StringBuilder();
+ Iterator<InternalRow> iterator = recordBatch.getIterator();
+ while (iterator.hasNext()) {
+ try {
+ builder.append(rowToString(iterator.next()));
+ } catch (DorisException e) {
+ throw new IOException(e);
+ }
+ builder.append(delimStr);
+ }
+ return builder.toString().substring(0, builder.length()-delimStr.length());
+ }
+
+
+ /**
+ * Convert Spark row data to string
+ *
+ * @param row row data
+ * @return byte array
+ * @throws DorisException
+ */
+ private String rowToString(InternalRow row) throws DorisException {
+
+ String str;
+
+ if (passThrough) {
+ str = row.getString(0);
+ return str;
+ }
+
+ switch (recordBatch.getFormat()) {
+ case CSV:
+ str = DataUtil.rowToCsvString(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes());
+ break;
+ case JSON:
+ try {
+ str = DataUtil.rowToJsonString(row, recordBatch.getSchema());
+ } catch (JsonProcessingException e) {
+ throw new DorisException("parse row to json bytes failed", e);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported format: ", recordBatch.getFormat().toString());
+ }
+
+ return str;
+
+ }
+}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index f7218c3..5c4527e 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -40,7 +40,7 @@
public static final String NULL_VALUE = "\\N";
- public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
+ public static String rowToCsvString(InternalRow row, StructType schema, String sep, boolean quote) {
StructField[] fields = schema.fields();
int n = row.numFields();
if (n > 0) {
@@ -51,9 +51,13 @@
value = "\"" + value + "\"";
}
return value.toString();
- }).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8);
+ }).collect(Collectors.joining(sep));
}
- return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8);
+ return StringUtils.EMPTY;
+ }
+
+ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) {
+ return rowToCsvString(row, schema, sep, quote).getBytes(StandardCharsets.UTF_8);
}
public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException {
@@ -65,4 +69,13 @@
return MAPPER.writeValueAsBytes(rowMap);
}
+ public static String rowToJsonString(InternalRow row, StructType schema) throws JsonProcessingException {
+ StructField[] fields = schema.fields();
+ Map<String, Object> rowMap = new HashMap<>(row.numFields());
+ for (int i = 0; i < fields.length; i++) {
+ rowMap.put(fields[i].name(), SchemaUtils.rowColumnValue(row, i, fields[i].dataType()));
+ }
+ return MAPPER.writeValueAsString(rowMap);
+ }
+
}
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java
new file mode 100644
index 0000000..7fdd88f
--- /dev/null
+++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/load/TestDorisStreamLoad.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load;
+
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.spark.SparkConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestDorisStreamLoad {
+
+ @Test
+ public void compressByGZ() throws IOException {
+ String content = "1,aa,1\n" +
+ "2,aa,2\n" +
+ "3,aa,3\n" +
+ "4,aa,4\n" +
+ "5,aa,5\n" +
+ "6,aa,6\n" +
+ "7,aa,7\n" +
+ "8,aa,8\n" +
+ "9,aa,9\n" +
+ "10,aa,10\n" +
+ "11,aa,11\n" +
+ "12,aa,12\n" +
+ "13,aa,13\n" +
+ "14,aa,14\n" +
+ "15,aa,15\n" +
+ "16,aa,16\n" +
+ "17,aa,17\n" +
+ "18,aa,18\n" +
+ "19,aa,19\n" +
+ "20,aa,20\n" +
+ "21,aa,21\n" +
+ "22,aa,22\n" +
+ "23,aa,23\n" +
+ "24,aa,24\n" +
+ "25,aa,25\n" +
+ "26,aa,26\n" +
+ "27,aa,27\n" +
+ "28,aa,28\n" +
+ "29,aa,29\n" +
+ "30,aa,30\n" +
+ "31,aa,31\n" +
+ "32,aa,32\n" +
+ "33,aa,33\n" +
+ "34,aa,34\n" +
+ "35,aa,35\n" +
+ "36,aa,36\n" +
+ "37,aa,37\n" +
+ "38,aa,38\n" +
+ "39,aa,39";
+ byte[] compressByte = new DorisStreamLoad(new SparkSettings(new SparkConf().set("doris.table.identifier", "aa.bb"))).compressByGZ(content);
+
+ int contentByteLength = content.getBytes("utf-8").length;
+ int compressByteLength = compressByte.length;
+ System.out.println(contentByteLength);
+ System.out.println(compressByteLength);
+ Assert.assertTrue(contentByteLength > compressByteLength);
+
+ java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
+ java.io.ByteArrayInputStream in = new java.io.ByteArrayInputStream(compressByte);
+ java.util.zip.GZIPInputStream ungzip = new java.util.zip.GZIPInputStream(in);
+ byte[] buffer = new byte[1024];
+ int n;
+ while ((n = ungzip.read(buffer)) >= 0) out.write(buffer, 0, n);
+ byte[] unGzipByte = out.toByteArray();
+
+ String unGzipStr = new String(unGzipByte);
+ Assert.assertArrayEquals(unGzipStr.getBytes("utf-8"), content.getBytes("utf-8"));
+ Assert.assertEquals(unGzipStr, content);
+ }
+}