[feature] support overwrite save mode (#149)
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 74a53cc..4148a66 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -184,6 +184,15 @@
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ <version>8.0.33</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a144fb8..6498916 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -20,6 +20,7 @@
public interface ConfigurationOptions {
// doris fe node address
String DORIS_FENODES = "doris.fenodes";
+ String DORIS_QUERY_PORT = "doris.query.port";
String DORIS_DEFAULT_CLUSTER = "default_cluster";
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
new file mode 100644
index 0000000..aab1032
--- /dev/null
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.jdbc
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+object JdbcUtils {
+
+ def getJdbcUrl(host: String, port: Int): String = s"jdbc:mysql://$host:$port/information_schema"
+
+ def getConnection(url: String, props: Properties): Connection = {
+
+ DriverManager.getConnection(url, props)
+ }
+
+ def getTruncateQuery(table: String): String = s"TRUNCATE TABLE $table"
+
+}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index 049d5a2..fe7e63d 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -23,7 +23,7 @@
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -98,6 +98,7 @@
}
data.write.format(DorisSourceProvider.SHORT_NAME)
.options(insertCfg)
+ .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.save()
}
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 94fab9e..ac04401 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,7 +17,10 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.exception.DorisException
+import org.apache.doris.spark.jdbc.JdbcUtils
import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.SparkConf
@@ -28,7 +31,10 @@
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
+import java.util.Properties
import scala.collection.JavaConverters.mapAsJavaMapConverter
+import scala.util.control.Breaks
+import scala.util.{Failure, Success, Try}
private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
@@ -54,6 +60,13 @@
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
sparkSettings.merge(Utils.params(parameters, logger).asJava)
+
+ mode match {
+ case SaveMode.Overwrite =>
+ truncateTable(sparkSettings)
+ case _: SaveMode => // do nothing
+ }
+
// init stream loader
val writer = new DorisWriter(sparkSettings)
writer.write(data)
@@ -79,6 +92,50 @@
sparkSettings.merge(Utils.params(parameters, logger).asJava)
new DorisStreamLoadSink(sqlContext, sparkSettings)
}
+
+ private def truncateTable(sparkSettings: SparkSettings): Unit = {
+
+ val feNodes = sparkSettings.getProperty(ConfigurationOptions.DORIS_FENODES)
+ val port = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_QUERY_PORT)
+ require(feNodes != null && feNodes.nonEmpty, "doris.fenodes cannot be null or empty")
+ require(port != null, "doris.query.port cannot be null")
+ val feNodesArr = feNodes.split(",")
+ val breaks = new Breaks
+
+ var success = false
+ var exOption: Option[Exception] = None
+
+ breaks.breakable {
+ feNodesArr.foreach(feNode => {
+ Try {
+ val host = feNode.split(":")(0)
+ val url = JdbcUtils.getJdbcUrl(host, port)
+ val props = new Properties()
+ props.setProperty("user", sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER))
+ props.setProperty("password", sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD))
+ val conn = JdbcUtils.getConnection(url, props)
+ val statement = conn.createStatement()
+ val tableIdentifier = sparkSettings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
+ val query = JdbcUtils.getTruncateQuery(tableIdentifier)
+ statement.execute(query)
+ success = true
+ logger.info(s"truncate table $tableIdentifier success")
+ } match {
+ case Success(_) => breaks.break()
+ case Failure(e: Exception) =>
+ exOption = Some(e)
+ logger.warn(s"truncate table failed on $feNode, error: {}", ExceptionUtils.getStackTrace(e))
+ }
+ })
+
+ }
+
+ if (!success) {
+ throw new DorisException("truncate table failed", exOption.get)
+ }
+
+ }
+
}
object DorisSourceProvider {