Extract a common method for retrying functions in Utils (#89)
* extract retry method in Utils
* revert irrelevant changes
* revert unnecessary change
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index d8ecf8d..9c12cf7 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -24,6 +24,7 @@
import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
import scala.collection.JavaConversions._
import scala.util.Try
+
import org.apache.doris.spark.backend.BackendClient
import org.apache.doris.spark.cfg.ConfigurationOptions._
import org.apache.doris.spark.cfg.Settings
@@ -36,10 +37,16 @@
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.spark.internal.Logging
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+import scala.collection.JavaConversions._
+import scala.util.Try
import scala.util.control.Breaks
/**
* read data from Doris BE to array.
+ *
* @param partition Doris RDD partition
* @param settings request configuration
*/
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 671c4b8..e469f38 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -27,12 +27,13 @@
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
+
import java.io.IOException
+import java.time.Duration
import java.util
-import org.apache.doris.spark.rest.RestService
import java.util.Objects
import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.control.Breaks
+import scala.util.{Failure, Success}
private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
@@ -86,49 +87,28 @@
}
rowsBuffer.add(line)
if (rowsBuffer.size > maxRowCount - 1 ) {
- flush
+ flush()
}
})
// flush buffer
if (!rowsBuffer.isEmpty) {
- flush
+ flush()
}
/**
* flush data to Doris and do retry when flush error
*
*/
- def flush = {
- val loop = new Breaks
- var err: Exception = null
- loop.breakable {
-
- for (i <- 1 to maxRetryTimes) {
- try {
- dorisStreamLoader.loadV2(rowsBuffer)
- rowsBuffer.clear()
- Thread.sleep(batchInterValMs.longValue())
- loop.break()
- }
- catch {
- case e: Exception =>
- try {
- logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
- if (err == null) err = e
- Thread.sleep(1000 * i)
- } catch {
- case ex: InterruptedException =>
- Thread.currentThread.interrupt()
- throw new IOException("unable to flush; interrupted while doing another attempt", e)
- }
- }
- }
-
- if (!rowsBuffer.isEmpty) {
- throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
- }
+ def flush(): Unit = {
+ Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+ dorisStreamLoader.loadV2(rowsBuffer)
+ rowsBuffer.clear()
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ throw new IOException(
+ s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
}
-
}
})
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index e91e8fa..4644820 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -24,11 +24,12 @@
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
-import collection.JavaConverters._
import java.io.IOException
+import java.time.Duration
import java.util
import java.util.Objects
-import scala.util.control.Breaks
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success}
private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable {
@@ -69,33 +70,13 @@
*
*/
def flush(batch: Iterable[util.List[Object]]): Unit = {
- val loop = new Breaks
- var err: Exception = null
- var loadSuccess: Boolean = false;
- loop.breakable {
- (1 to maxRetryTimes).foreach { i =>
- try {
- dorisStreamLoader.loadV2(batch.toList.asJava)
- loadSuccess = true
- Thread.sleep(batchInterValMs.longValue())
- loop.break()
- } catch {
- case e: Exception =>
- try {
- logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
- if (err == null) err = e
- Thread.sleep(1000 * i)
- } catch {
- case ex: InterruptedException =>
- Thread.currentThread.interrupt()
- throw new IOException("unable to flush; interrupted while doing another attempt", ex)
- }
- }
- }
- // check load success, if not throw exception
- if (!loadSuccess) {
- throw new IOException(s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
- }
+ Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+ dorisStreamLoader.loadV2(batch.toList.asJava)
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ throw new IOException(
+ s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e)
}
}
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 18dd3b2..ba6fa86 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -25,6 +25,11 @@
import org.slf4j.Logger
import java.sql.{Date, Timestamp}
+import java.time.Duration
+import java.util.concurrent.locks.LockSupport
+import scala.annotation.tailrec
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
private[sql] object Utils {
/**
@@ -158,4 +163,19 @@
finalParams
}
+
+ @tailrec
+ def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = {
+ assert(retryTimes >= 0)
+ val result = Try(f)
+ result match {
+ case Success(result) => Success(result)
+ case Failure(exception: T) if retryTimes > 0 =>
+ logger.warn(s"Execution failed caused by: ", exception)
+ logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms")
+ LockSupport.parkNanos(interval.toNanos)
+ retry(retryTimes - 1, interval, logger)(f)
+ case Failure(exception) => Failure(exception)
+ }
+ }
}