[feature] support overwrite save mode (#149)

diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 74a53cc..4148a66 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -184,6 +184,15 @@
             <artifactId>jackson-core</artifactId>
             <version>${fasterxml.jackson.version}</version>
         </dependency>
+
+        <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+            <version>8.0.33</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a144fb8..6498916 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -20,6 +20,7 @@
 public interface ConfigurationOptions {
     // doris fe node address
     String DORIS_FENODES = "doris.fenodes";
+    String DORIS_QUERY_PORT = "doris.query.port";
 
     String DORIS_DEFAULT_CLUSTER = "default_cluster";
 
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
new file mode 100644
index 0000000..aab1032
--- /dev/null
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.jdbc
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+object JdbcUtils {
+
+  def getJdbcUrl(host: String, port: Int): String = s"jdbc:mysql://$host:$port/information_schema"
+
+  def getConnection(url: String, props: Properties): Connection = {
+
+    DriverManager.getConnection(url, props)
+  }
+
+  def getTruncateQuery(table: String): String = s"TRUNCATE TABLE $table"
+
+}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index 049d5a2..fe7e63d 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -23,7 +23,7 @@
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -98,6 +98,7 @@
     }
     data.write.format(DorisSourceProvider.SHORT_NAME)
       .options(insertCfg)
+      .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
       .save()
   }
 }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 94fab9e..ac04401 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,7 +17,10 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.exception.DorisException
+import org.apache.doris.spark.jdbc.JdbcUtils
 import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
 import org.apache.doris.spark.writer.DorisWriter
 import org.apache.spark.SparkConf
@@ -28,7 +31,10 @@
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.slf4j.{Logger, LoggerFactory}
 
+import java.util.Properties
 import scala.collection.JavaConverters.mapAsJavaMapConverter
+import scala.util.control.Breaks
+import scala.util.{Failure, Success, Try}
 
 private[sql] class DorisSourceProvider extends DataSourceRegister
   with RelationProvider
@@ -54,6 +60,13 @@
 
     val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
     sparkSettings.merge(Utils.params(parameters, logger).asJava)
+
+    mode match {
+      case SaveMode.Overwrite =>
+        truncateTable(sparkSettings)
+      case _: SaveMode => // do nothing
+    }
+
     // init stream loader
     val writer = new DorisWriter(sparkSettings)
     writer.write(data)
@@ -79,6 +92,50 @@
     sparkSettings.merge(Utils.params(parameters, logger).asJava)
     new DorisStreamLoadSink(sqlContext, sparkSettings)
   }
+
+  private def truncateTable(sparkSettings: SparkSettings): Unit = {
+
+    val feNodes = sparkSettings.getProperty(ConfigurationOptions.DORIS_FENODES)
+    val port = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_QUERY_PORT)
+    require(feNodes != null && feNodes.nonEmpty, "doris.fenodes cannot be null or empty")
+    require(port != null, "doris.query.port cannot be null")
+    val feNodesArr = feNodes.split(",")
+    val breaks = new Breaks
+
+    var success = false
+    var exOption: Option[Exception] = None
+
+    breaks.breakable {
+      feNodesArr.foreach(feNode => {
+        Try {
+          val host = feNode.split(":")(0)
+          val url = JdbcUtils.getJdbcUrl(host, port)
+          val props = new Properties()
+          props.setProperty("user", sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER))
+          props.setProperty("password", sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD))
+          val conn = JdbcUtils.getConnection(url, props)
+          val statement = conn.createStatement()
+          val tableIdentifier = sparkSettings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
+          val query = JdbcUtils.getTruncateQuery(tableIdentifier)
+          statement.execute(query)
+          success = true
+          logger.info(s"truncate table $tableIdentifier success")
+        } match {
+          case Success(_) => breaks.break()
+          case Failure(e: Exception) =>
+            exOption = Some(e)
+            logger.warn(s"truncate table failed on $feNode, error: {}", ExceptionUtils.getStackTrace(e))
+        }
+      })
+
+    }
+
+    if (!success) {
+      throw new DorisException("truncate table failed", exOption.get)
+    }
+
+  }
+
 }
 
 object DorisSourceProvider {