escape column_separator bug fixed. (#121)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index e1c1bc1..5341f67 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
 package org.apache.doris.spark.load;
 
 import org.apache.doris.spark.cfg.ConfigurationOptions;
@@ -23,7 +22,6 @@
 import org.apache.doris.spark.rest.RestService;
 import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.RespContent;
-import org.apache.doris.spark.util.EscapeHandler;
 import org.apache.doris.spark.util.ListUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -105,8 +103,8 @@
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
         fileType = streamLoadProp.getOrDefault("format", "csv");
-        if ("csv".equals(fileType)){
-            FIELD_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
+        if ("csv".equals(fileType)) {
+            FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
         } else if ("json".equalsIgnoreCase(fileType)) {
             readJsonByLine = Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false"));
             boolean stripOuterArray = Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false"));
@@ -114,7 +112,7 @@
                 throw new IllegalArgumentException("Only one of options 'read_json_by_line' and 'strip_outer_array' can be set to true");
             }
         }
-        LINE_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
+        LINE_DELIMITER = escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
     }
 
     public String getLoadUrlStr() {
@@ -303,4 +301,23 @@
 
     }
 
+    private String escapeString(String hexData) {
+        if (hexData.startsWith("\\x") || hexData.startsWith("\\X")) {
+            try {
+                hexData = hexData.substring(2);
+                StringBuilder stringBuilder = new StringBuilder();
+                for (int i = 0; i < hexData.length(); i += 2) {
+                    String hexByte = hexData.substring(i, i + 2);
+                    int decimal = Integer.parseInt(hexByte, 16);
+                    char character = (char) decimal;
+                    stringBuilder.append(character);
+                }
+                return stringBuilder.toString();
+            } catch (Exception e) {
+                throw new RuntimeException("escape column_separator or line_delimiter error.{}" , e);
+            }
+        }
+        return hexData;
+    }
+
 }
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
deleted file mode 100644
index 87a3989..0000000
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.util;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class EscapeHandler {
-    public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
-    public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
-
-    public static String escapeString(String source) {
-        if (source.contains(ESCAPE_DELIMITERS_FLAGS)) {
-            Matcher m = ESCAPE_PATTERN.matcher(source);
-            StringBuffer buf = new StringBuffer();
-            while (m.find()) {
-                m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16)));
-            }
-            m.appendTail(buf);
-            return buf.toString();
-        }
-        return source;
-    }
-
-}
\ No newline at end of file
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
index 5b2b36f..902c634 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
@@ -33,7 +33,7 @@
   private var closed = false
 
   // the reader obtain data from Doris BE
-  lazy val reader = {
+  private lazy val reader = {
     initialized = true
     val settings = partition.settings()
     initReader(settings)
@@ -64,7 +64,7 @@
     createValue(value)
   }
 
-  def closeIfNeeded(): Unit = {
+  private def closeIfNeeded(): Unit = {
     logger.trace(s"Close status is '$closed' when close Doris RDD Iterator")
     if (!closed) {
       close()
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
index e764ea0..0ff8bbd 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
@@ -39,7 +39,7 @@
     partition: PartitionDefinition)
     extends AbstractDorisRDDIterator[T](context, partition) {
 
-  override def initReader(settings: Settings) = {
+  override def initReader(settings: Settings): Unit = {
     settings.setProperty(DORIS_VALUE_READER_CLASS, classOf[ScalaValueReader].getName)
   }
 
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 9c12cf7..719b16b 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -37,11 +37,6 @@
 import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
 import org.apache.spark.internal.Logging
 
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
-import scala.collection.JavaConversions._
-import scala.util.Try
 import scala.util.control.Breaks
 
 /**
@@ -50,36 +45,45 @@
  * @param partition Doris RDD partition
  * @param settings request configuration
  */
-class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging{
+class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging {
 
-  protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
-  protected val clientLock =
-    if (deserializeArrowToRowBatchAsync) new ReentrantLock()
-    else new NoOpLock
-  protected var offset = 0
-  protected var eos: AtomicBoolean = new AtomicBoolean(false)
+  private[this] lazy val client = new BackendClient(new Routing(partition.getBeAddress), settings)
+
+  private[this] var offset = 0
+
+  private[this] val eos: AtomicBoolean = new AtomicBoolean(false)
+
   protected var rowBatch: RowBatch = _
+
   // flag indicate if support deserialize Arrow to RowBatch asynchronously
-  protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
+  private[this] lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
     settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
   } getOrElse {
-    logWarning(String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)))
+    logWarning(
+      String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE,
+        DORIS_DESERIALIZE_ARROW_ASYNC,
+        settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)
+      )
+    )
     DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
   }
 
-  protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
+  private[this] val rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
     val blockingQueueSize = Try {
       settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
     } getOrElse {
       logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)))
       DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
     }
-
-    var queue: BlockingQueue[RowBatch] = null
     if (deserializeArrowToRowBatchAsync) {
-      queue = new ArrayBlockingQueue(blockingQueueSize)
+      new ArrayBlockingQueue(blockingQueueSize)
+    } else {
+      null
     }
-    queue
+  }
+
+  private[this] val clientLock = {
+    if (deserializeArrowToRowBatchAsync) new ReentrantLock() else new NoOpLock
   }
 
   private val openParams: TScanOpenParams = {
@@ -87,7 +91,6 @@
     params.cluster = DORIS_DEFAULT_CLUSTER
     params.database = partition.getDatabase
     params.table = partition.getTable
-
     params.tablet_ids = partition.getTabletIds.toList
     params.opaqued_query_plan = partition.getQueryPlan
 
@@ -129,7 +132,6 @@
         s"execution memory limit: $execMemLimit, " +
         s"user: ${params.getUser}, " +
         s"query plan: ${params.getOpaquedQueryPlan}")
-
     params
   }
 
@@ -138,8 +140,8 @@
   protected val schema: Schema =
     SchemaUtils.convertToSchema(openResult.getSelectedColumns)
 
-  protected val asyncThread: Thread = new Thread {
-    override def run {
+  private[this] val asyncThread: Thread = new Thread {
+    override def run(): Unit = {
       val nextBatchParams = new TScanNextBatchParams
       nextBatchParams.setContextId(contextId)
       while (!eos.get) {
@@ -149,17 +151,17 @@
         if (!eos.get) {
           val rowBatch = new RowBatch(nextResult, schema)
           offset += rowBatch.getReadRowCount
-          rowBatch.close
+          rowBatch.close()
           rowBatchBlockingQueue.put(rowBatch)
         }
       }
     }
   }
 
-  protected val asyncThreadStarted: Boolean = {
+  private val asyncThreadStarted: Boolean = {
     var started = false
     if (deserializeArrowToRowBatchAsync) {
-      asyncThread.start
+      asyncThread.start()
       started = true
     }
     started
@@ -197,7 +199,7 @@
       if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) {
         if (rowBatch != null) {
           offset += rowBatch.getReadRowCount
-          rowBatch.close
+          rowBatch.close()
         }
         val nextBatchParams = new TScanNextBatchParams
         nextBatchParams.setContextId(contextId)
diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java
deleted file mode 100644
index d8fb270..0000000
--- a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.util;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-
-import java.util.Properties;
-
-public class EscapeHandlerTest extends TestCase {
-
-    public void testEscapeString() {
-
-
-        String s1 = "\\x09\\x09";
-        String s2 = "\\x0A\\x0A";
-        Assert.assertEquals("\t\t", EscapeHandler.escapeString(s1));
-        Assert.assertEquals("\n\n", EscapeHandler.escapeString(s2));
-
-    }
-}
\ No newline at end of file