[refactor] Unified writing through DorisWriter (#104)

* use writer to write data
* resolve conflicts
* unify jackson version
* remove useless code
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 77b37c1..e4b4c8b 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -70,7 +70,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.scm.id>github</project.scm.id>
         <netty.version>4.1.77.Final</netty.version>
-        <fasterxml.jackson.version>2.13.3</fasterxml.jackson.version>
+        <fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
         <thrift-service.version>1.0.0</thrift-service.version>
     </properties>
 
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
similarity index 90%
rename from spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
index 1d89126..d3dab49 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
@@ -15,17 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.DorisException;
-
-import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
similarity index 96%
rename from spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 6738c09..61379e3 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,7 +14,15 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
+
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.RestService;
+import org.apache.doris.spark.rest.models.BackendV2;
+import org.apache.doris.spark.rest.models.RespContent;
+import org.apache.doris.spark.util.ListUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,13 +31,6 @@
 import com.google.common.cache.LoadingCache;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.doris.spark.cfg.ConfigurationOptions;
-import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.StreamLoadException;
-import org.apache.doris.spark.rest.RestService;
-import org.apache.doris.spark.rest.models.BackendV2;
-import org.apache.doris.spark.rest.models.RespContent;
-import org.apache.doris.spark.util.ListUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -45,10 +46,17 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
-import java.sql.Date;
 import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -73,13 +81,11 @@
     private String tbl;
     private String authEncoded;
     private String columns;
-    private String[] dfColumns;
     private String maxFilterRatio;
     private Map<String, String> streamLoadProp;
     private static final long cacheExpireTimeout = 4 * 60;
     private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
     private final String fileType;
-    private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
 
     public DorisStreamLoad(SparkSettings settings) {
         String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -101,11 +107,6 @@
         }
     }
 
-    public DorisStreamLoad(SparkSettings settings, String[] dfColumns) {
-        this(settings);
-        this.dfColumns = dfColumns;
-    }
-
     public String getLoadUrlStr() {
         if (StringUtils.isEmpty(loadUrlStr)) {
             return "";
@@ -168,7 +169,7 @@
     }
 
 
-    public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
+    public void loadV2(List<List<Object>> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException {
         if (fileType.equals("csv")) {
             load(listToString(rows));
         } else if(fileType.equals("json")) {
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index e469f38..94fab9e 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,9 +17,9 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.DorisStreamLoad
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.cfg.SparkSettings
 import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
+import org.apache.doris.spark.writer.DorisWriter
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.sources._
@@ -28,12 +28,7 @@
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
 import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.{Failure, Success}
 
 private[sql] class DorisSourceProvider extends DataSourceRegister
   with RelationProvider
@@ -60,58 +55,9 @@
     val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
     sparkSettings.merge(Utils.params(parameters, logger).asJava)
     // init stream loader
-    val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)
+    val writer = new DorisWriter(sparkSettings)
+    writer.write(data)
 
-    val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
-    val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
-    val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
-    val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
-    val batchInterValMs = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
-
-    logger.info(s"maxRowCount ${maxRowCount}")
-    logger.info(s"maxRetryTimes ${maxRetryTimes}")
-    logger.info(s"batchInterVarMs ${batchInterValMs}")
-
-    var resultRdd = data.rdd
-    if (Objects.nonNull(sinkTaskPartitionSize)) {
-      resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
-    }
-
-    resultRdd.foreachPartition(partition => {
-      val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
-      partition.foreach(row => {
-        val line: util.List[Object] = new util.ArrayList[Object]()
-        for (i <- 0 until row.size) {
-          val field = row.get(i)
-          line.add(field.asInstanceOf[AnyRef])
-        }
-        rowsBuffer.add(line)
-        if (rowsBuffer.size > maxRowCount - 1 ) {
-          flush()
-        }
-      })
-      // flush buffer
-      if (!rowsBuffer.isEmpty) {
-        flush()
-      }
-
-      /**
-       * flush data to Doris and do retry when flush error
-       *
-       */
-      def flush(): Unit = {
-        Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
-          dorisStreamLoader.loadV2(rowsBuffer)
-          rowsBuffer.clear()
-        } match {
-          case Success(_) =>
-          case Failure(e) =>
-            throw new IOException(
-              s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
-        }
-      }
-
-    })
     new BaseRelation {
       override def sqlContext: SQLContext = unsupportedException
 
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 4644820..342e940 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,69 +17,27 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.rdd.RDD
+import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.doris.spark.writer.DorisWriter
 import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success}
-
 private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable {
 
   private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
   @volatile private var latestBatchId = -1L
-  val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
-  val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
-  val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
-  val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
-  val batchInterValMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
 
-  val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+  private val writer = new DorisWriter(settings)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= latestBatchId) {
       logger.info(s"Skipping already committed batch $batchId")
     } else {
-      write(data.rdd)
+      writer.write(data)
       latestBatchId = batchId
     }
   }
 
-  def write(rdd: RDD[Row]): Unit = {
-    var resultRdd = rdd
-    if (Objects.nonNull(sinkTaskPartitionSize)) {
-      resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
-    }
-    resultRdd
-      .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
-      .foreachPartition(partition => {
-        partition
-          .grouped(batchSize)
-          .foreach(batch => flush(batch))
-      })
-
-    /**
-     * flush data to Doris and do retry when flush error
-     *
-     */
-    def flush(batch: Iterable[util.List[Object]]): Unit = {
-      Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
-        dorisStreamLoader.loadV2(batch.toList.asJava)
-      } match {
-        case Success(_) =>
-        case Failure(e) =>
-          throw new IOException(
-            s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e)
-      }
-    }
-  }
-
   override def toString: String = "DorisStreamLoadSink"
 }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index ba6fa86..2f3a5bb 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -31,7 +31,7 @@
 import scala.reflect.ClassTag
 import scala.util.{Failure, Success, Try}
 
-private[sql] object Utils {
+private[spark] object Utils {
   /**
    * quote column name
    * @param colName column name
@@ -169,7 +169,9 @@
     assert(retryTimes >= 0)
     val result = Try(f)
     result match {
-      case Success(result) => Success(result)
+      case Success(result) =>
+        LockSupport.parkNanos(interval.toNanos)
+        Success(result)
       case Failure(exception: T) if retryTimes > 0 =>
         logger.warn(s"Execution failed caused by: ", exception)
         logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms")
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
new file mode 100644
index 0000000..3839ff7
--- /dev/null
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.writer
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.load.{CachedDorisStreamLoadClient, DorisStreamLoad}
+import org.apache.doris.spark.sql.Utils
+import org.apache.spark.sql.DataFrame
+import org.slf4j.{Logger, LoggerFactory}
+
+import java.io.IOException
+import java.time.Duration
+import java.util
+import java.util.Objects
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success}
+
+class DorisWriter(settings: SparkSettings) extends Serializable {
+
+  private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
+
+  val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
+    ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+  private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
+    ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+  private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+  private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
+    ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+  private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
+    ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
+
+  private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+
+  def write(dataFrame: DataFrame): Unit = {
+    var resultRdd = dataFrame.rdd
+    val dfColumns = dataFrame.columns
+    if (Objects.nonNull(sinkTaskPartitionSize)) {
+      resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
+    }
+    resultRdd
+      .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
+      .foreachPartition(partition => {
+        partition
+          .grouped(batchSize)
+          .foreach(batch => flush(batch, dfColumns))
+      })
+
+    /**
+     * flush data to Doris and do retry when flush error
+     *
+     */
+    def flush(batch: Iterable[util.List[Object]], dfColumns: Array[String]): Unit = {
+      Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+        dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns)
+      } match {
+        case Success(_) =>
+        case Failure(e) =>
+          throw new IOException(
+            s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
+      }
+    }
+
+  }
+
+
+}