[refactor] Unified writing through DorisWriter (#104)
* use writer to write data
* resolve conflicts
* unify jackson version
* remove useless code
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 77b37c1..e4b4c8b 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -70,7 +70,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.scm.id>github</project.scm.id>
<netty.version>4.1.77.Final</netty.version>
- <fasterxml.jackson.version>2.13.3</fasterxml.jackson.version>
+ <fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
<thrift-service.version>1.0.0</thrift-service.version>
</properties>
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
similarity index 90%
rename from spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
index 1d89126..d3dab49 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
@@ -15,17 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.DorisException;
-
-import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
similarity index 96%
rename from spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 6738c09..61379e3 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,7 +14,15 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
+
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.RestService;
+import org.apache.doris.spark.rest.models.BackendV2;
+import org.apache.doris.spark.rest.models.RespContent;
+import org.apache.doris.spark.util.ListUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,13 +31,6 @@
import com.google.common.cache.LoadingCache;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.doris.spark.cfg.ConfigurationOptions;
-import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.StreamLoadException;
-import org.apache.doris.spark.rest.RestService;
-import org.apache.doris.spark.rest.models.BackendV2;
-import org.apache.doris.spark.rest.models.RespContent;
-import org.apache.doris.spark.util.ListUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -45,10 +46,17 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.sql.Date;
import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -73,13 +81,11 @@
private String tbl;
private String authEncoded;
private String columns;
- private String[] dfColumns;
private String maxFilterRatio;
private Map<String, String> streamLoadProp;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fileType;
- private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -101,11 +107,6 @@
}
}
- public DorisStreamLoad(SparkSettings settings, String[] dfColumns) {
- this(settings);
- this.dfColumns = dfColumns;
- }
-
public String getLoadUrlStr() {
if (StringUtils.isEmpty(loadUrlStr)) {
return "";
@@ -168,7 +169,7 @@
}
- public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
+ public void loadV2(List<List<Object>> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException {
if (fileType.equals("csv")) {
load(listToString(rows));
} else if(fileType.equals("json")) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index e469f38..94fab9e 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,9 +17,9 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.DorisStreamLoad
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.cfg.SparkSettings
import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
+import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
@@ -28,12 +28,7 @@
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.{Failure, Success}
private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
@@ -60,58 +55,9 @@
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
sparkSettings.merge(Utils.params(parameters, logger).asJava)
// init stream loader
- val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)
+ val writer = new DorisWriter(sparkSettings)
+ writer.write(data)
- val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
-
- logger.info(s"maxRowCount ${maxRowCount}")
- logger.info(s"maxRetryTimes ${maxRetryTimes}")
- logger.info(s"batchInterVarMs ${batchInterValMs}")
-
- var resultRdd = data.rdd
- if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
- }
-
- resultRdd.foreachPartition(partition => {
- val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
- partition.foreach(row => {
- val line: util.List[Object] = new util.ArrayList[Object]()
- for (i <- 0 until row.size) {
- val field = row.get(i)
- line.add(field.asInstanceOf[AnyRef])
- }
- rowsBuffer.add(line)
- if (rowsBuffer.size > maxRowCount - 1 ) {
- flush()
- }
- })
- // flush buffer
- if (!rowsBuffer.isEmpty) {
- flush()
- }
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(): Unit = {
- Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(rowsBuffer)
- rowsBuffer.clear()
- } match {
- case Success(_) =>
- case Failure(e) =>
- throw new IOException(
- s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
- }
- }
-
- })
new BaseRelation {
override def sqlContext: SQLContext = unsupportedException
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 4644820..342e940 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,69 +17,27 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.rdd.RDD
+import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success}
-
private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable {
private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
@volatile private var latestBatchId = -1L
- val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
- val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+ private val writer = new DorisWriter(settings)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
logger.info(s"Skipping already committed batch $batchId")
} else {
- write(data.rdd)
+ writer.write(data)
latestBatchId = batchId
}
}
- def write(rdd: RDD[Row]): Unit = {
- var resultRdd = rdd
- if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
- }
- resultRdd
- .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
- .foreachPartition(partition => {
- partition
- .grouped(batchSize)
- .foreach(batch => flush(batch))
- })
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(batch: Iterable[util.List[Object]]): Unit = {
- Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(batch.toList.asJava)
- } match {
- case Success(_) =>
- case Failure(e) =>
- throw new IOException(
- s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e)
- }
- }
- }
-
override def toString: String = "DorisStreamLoadSink"
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index ba6fa86..2f3a5bb 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -31,7 +31,7 @@
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
-private[sql] object Utils {
+private[spark] object Utils {
/**
* quote column name
* @param colName column name
@@ -169,7 +169,9 @@
assert(retryTimes >= 0)
val result = Try(f)
result match {
- case Success(result) => Success(result)
+ case Success(result) =>
+ LockSupport.parkNanos(interval.toNanos)
+ Success(result)
case Failure(exception: T) if retryTimes > 0 =>
logger.warn(s"Execution failed caused by: ", exception)
logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms")
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
new file mode 100644
index 0000000..3839ff7
--- /dev/null
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.writer
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.load.{CachedDorisStreamLoadClient, DorisStreamLoad}
+import org.apache.doris.spark.sql.Utils
+import org.apache.spark.sql.DataFrame
+import org.slf4j.{Logger, LoggerFactory}
+
+import java.io.IOException
+import java.time.Duration
+import java.util
+import java.util.Objects
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success}
+
+class DorisWriter(settings: SparkSettings) extends Serializable {
+
+ private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
+
+ val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
+ ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+ private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
+ ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+ private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+ private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
+ ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+ private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
+ ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
+
+ private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+
+ def write(dataFrame: DataFrame): Unit = {
+ var resultRdd = dataFrame.rdd
+ val dfColumns = dataFrame.columns
+ if (Objects.nonNull(sinkTaskPartitionSize)) {
+ resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
+ }
+ resultRdd
+ .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
+ .foreachPartition(partition => {
+ partition
+ .grouped(batchSize)
+ .foreach(batch => flush(batch, dfColumns))
+ })
+
+ /**
+ * flush data to Doris and do retry when flush error
+ *
+ */
+ def flush(batch: Iterable[util.List[Object]], dfColumns: Array[String]): Unit = {
+ Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+ dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns)
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ throw new IOException(
+ s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
+ }
+ }
+
+ }
+
+
+}