Extract a common method for retrying functions in Utils (#89)

* extract retry method in Utils
* revert irrelevant changes
* revert unnecessary change
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index d8ecf8d..9c12cf7 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -24,6 +24,7 @@
 import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
 import scala.collection.JavaConversions._
 import scala.util.Try
+
 import org.apache.doris.spark.backend.BackendClient
 import org.apache.doris.spark.cfg.ConfigurationOptions._
 import org.apache.doris.spark.cfg.Settings
@@ -36,10 +37,16 @@
 import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
 import org.apache.spark.internal.Logging
 
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+import scala.collection.JavaConversions._
+import scala.util.Try
 import scala.util.control.Breaks
 
 /**
  * read data from Doris BE to array.
+ *
  * @param partition Doris RDD partition
  * @param settings request configuration
  */
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 671c4b8..e469f38 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -27,12 +27,13 @@
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.slf4j.{Logger, LoggerFactory}
+
 import java.io.IOException
+import java.time.Duration
 import java.util
-import org.apache.doris.spark.rest.RestService
 import java.util.Objects
 import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.control.Breaks
+import scala.util.{Failure, Success}
 
 private[sql] class DorisSourceProvider extends DataSourceRegister
   with RelationProvider
@@ -86,49 +87,28 @@
         }
         rowsBuffer.add(line)
         if (rowsBuffer.size > maxRowCount - 1 ) {
-          flush
+          flush()
         }
       })
       // flush buffer
       if (!rowsBuffer.isEmpty) {
-        flush
+        flush()
       }
 
       /**
        * flush data to Doris and do retry when flush error
        *
        */
-      def flush = {
-        val loop = new Breaks
-        var err: Exception = null
-        loop.breakable {
-
-          for (i <- 1 to maxRetryTimes) {
-            try {
-              dorisStreamLoader.loadV2(rowsBuffer)
-              rowsBuffer.clear()
-              Thread.sleep(batchInterValMs.longValue())
-              loop.break()
-            }
-            catch {
-              case e: Exception =>
-                try {
-                  logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
-                  if (err == null) err = e
-                  Thread.sleep(1000 * i)
-                } catch {
-                  case ex: InterruptedException =>
-                    Thread.currentThread.interrupt()
-                    throw new IOException("unable to flush; interrupted while doing another attempt", e)
-                }
-            }
-          }
-
-          if (!rowsBuffer.isEmpty) {
-            throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
-          }
+      def flush(): Unit = {
+        Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+          dorisStreamLoader.loadV2(rowsBuffer)
+          rowsBuffer.clear()
+        } match {
+          case Success(_) =>
+          case Failure(e) =>
+            throw new IOException(
+              s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e)
         }
-
       }
 
     })
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index e91e8fa..4644820 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -24,11 +24,12 @@
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 
-import collection.JavaConverters._
 import java.io.IOException
+import java.time.Duration
 import java.util
 import java.util.Objects
-import scala.util.control.Breaks
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success}
 
 private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable {
 
@@ -69,33 +70,13 @@
      *
      */
     def flush(batch: Iterable[util.List[Object]]): Unit = {
-      val loop = new Breaks
-      var err: Exception = null
-      var loadSuccess: Boolean = false;
-      loop.breakable {
-        (1 to maxRetryTimes).foreach { i =>
-          try {
-            dorisStreamLoader.loadV2(batch.toList.asJava)
-            loadSuccess = true
-            Thread.sleep(batchInterValMs.longValue())
-            loop.break()
-          } catch {
-            case e: Exception =>
-              try {
-                logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
-                if (err == null) err = e
-                Thread.sleep(1000 * i)
-              } catch {
-                case ex: InterruptedException =>
-                  Thread.currentThread.interrupt()
-                  throw new IOException("unable to flush; interrupted while doing another attempt", ex)
-              }
-          }
-        }
-        // check load success, if not throw exception
-        if (!loadSuccess) {
-          throw new IOException(s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err)
-        }
+      Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
+        dorisStreamLoader.loadV2(batch.toList.asJava)
+      } match {
+        case Success(_) =>
+        case Failure(e) =>
+          throw new IOException(
+            s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e)
       }
     }
   }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 18dd3b2..ba6fa86 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -25,6 +25,11 @@
 import org.slf4j.Logger
 
 import java.sql.{Date, Timestamp}
+import java.time.Duration
+import java.util.concurrent.locks.LockSupport
+import scala.annotation.tailrec
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
 
 private[sql] object Utils {
   /**
@@ -158,4 +163,19 @@
 
     finalParams
   }
+
+  @tailrec
+  def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = {
+    assert(retryTimes >= 0)
+    val result = Try(f)
+    result match {
+      case Success(result) => Success(result)
+      case Failure(exception: T) if retryTimes > 0 =>
+        logger.warn(s"Execution failed caused by: ", exception)
+        logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms")
+        LockSupport.parkNanos(interval.toNanos)
+        retry(retryTimes - 1, interval, logger)(f)
+      case Failure(exception) => Failure(exception)
+    }
+  }
 }