[GRIFFIN-322] Add SQL mode for ES connector

As  [GRIFFIN-322](https://issues.apache.org/jira/projects/GRIFFIN/issues/GRIFFIN-322?filter=allopenissues) , we want add sql mode for es connector.

**The sql mode would more effective and user-friendly.**

Current mode config:
{   "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
    "index": "index-xxx",
    "type": "metric",
    "host": "xxxxxxxxxx",
    "port": "xxxx",
    "fields": ["col_a", "col_b", "col_c"],
    "size": 100}

SQL mode config:
{    "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
     "sql.mode": true,
     "host": "xxxxxxxxxx",
     "port": "xxxx",
     "sql": "select col_a, col_b, col_c from index-xx limit 100"}

Compared with current mode, SQL mode could support other types except number type.

Author: yuxiaoyu <yuxiaoyu@bytedance.com>

Closes #567 from XiaoyuBD/enrichEsConnectorAddSqlMode.
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index d1a6c36..4d7594c 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -248,6 +248,62 @@
  **Note:** User-provided data connector should be present in Spark job's class path, by either providing custom jar with 
 `--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.  
 
+ ##### For ElasticSearch Custom Data Connectors:
+  - Currently supported SQL mode (for ElasticSearch with sql plugin) and NORMAL mode.
+  - For NORMAL mode, config object supports the following keys,
+  
+ | Name       | Type     | Description                            | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-------------- |
+ | index      | `String` | ElasticSearch index name| default |
+ | type       | `String` | ElasticSearch data type | accuracy |
+ | host       | `String` | ElasticSearch url host | `Empty` |
+ | port       | `String` | ElasticSearch url port | `Empty` |
+ | fields     | `List`   | list of columns | `Empty` |
+ | size       | `Integer`| data size (lines) to load | 100 |
+ | metric.name| `String` | metric name to load | * |
+
+ - Example:
+      ```
+     "connectors": [
+       { 
+         "type": "custom",
+         "config": {
+           "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
+           "index": "test-index-v1",
+           "type": "metric",
+           "host": "test.es-xxx.org",
+           "port": "80",
+           "fields": ["col_a", "col_b", "col_c"],
+           "size": 20
+         }
+       }
+     ]
+      ```
+  - For SQL mode, config object supports the following keys,
+  
+ | Name       | Type     | Description                            | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-------------- |
+ | host       | `String` | ElasticSearch url host | `Empty` |
+ | port       | `String` | ElasticSearch url port | `Empty` |
+ | sql.mode   | `Boolean`| use sql mode | false |
+ | sql        | `String` | ElasticSearch SQL | `Empty` |
+
+ - Example:
+      ```
+     "connectors": [
+       { 
+         "type": "custom",
+         "config": {
+           "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
+           "host": "test.es-xxx.org",
+           "port": "80",
+           "sql.mode": true,
+           "sql": "select col_a, col_b, col_c from test-index-v1 limit 20"
+         }
+       }
+     ]
+      ```
+ 
  ##### For File based Data Connectors:
 
  - Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index b70ba51..93ccb79 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -26,10 +26,11 @@
 
 import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpRequestBase}
+import org.apache.http.entity.{ContentType, StringEntity}
 import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
 import org.apache.spark.sql.types.{StructField, StructType}
 
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -53,6 +54,8 @@
   val Fields = "fields"
   val Size = "size"
   val MetricName = "metric.name"
+  val Sql = "sql"
+  val SqlMode = "sql.mode"
   val index: String = config.getString(Index, "default")
   val dataType: String = config.getString(Type, "accuracy")
   val metricName: String = config.getString(MetricName, "*")
@@ -60,15 +63,44 @@
   val version: String = config.getString(EsVersion, "")
   val port: String = config.getString(Port, "")
   val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
+  val sql: String = config.getString(Sql, "")
+  val sqlMode: Boolean = config.getBoolean(SqlMode, false)
   val size: Int = config.getInt(Size, 100)
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    if (sqlMode) dataBySql(ms) else dataBySearch(ms)
+  }
 
+  def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
+    val path: String = s"/_sql?format=csv"
+    info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
+    val dfOpt = try {
+      val answer = httpPost(path, sql)
+      if (answer._1) {
+        import sparkSession.implicits._
+        val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+        val reader: DataFrameReader = sparkSession.read
+        reader.option("header", true).option("inferSchema", true)
+        val df: DataFrame = reader.csv(rdd.toDS())
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } else None
+    } catch {
+      case e: Throwable =>
+        error(s"load ES by sql $host:$port $sql  fails: ${e.getMessage}", e)
+        None
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  def dataBySearch(ms: Long): (Option[DataFrame], TimeRange) = {
     val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
     info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
 
     val dfOpt = try {
-      val answer = httpRequest(path)
+      val answer = httpGet(path)
       val data = ArrayBuffer[Map[String, Number]]()
 
       if (answer._1) {
@@ -109,19 +141,30 @@
     (dfOpt, TimeRange(ms, tmsts))
   }
 
-  def httpRequest(path: String): (Boolean, String) = {
-
+  def httpGet(path: String): (Boolean, String) = {
     val url: String = s"$getBaseUrl$path"
     info(s"url:$url")
     val uri: URI = new URI(url)
-    val request = new org.apache.http.client.methods.HttpGet(uri)
+    val request = new HttpGet(uri)
+    doRequest(request)
+  }
+
+  def httpPost(path: String, body: String): (Boolean, String) = {
+    val url: String = s"$getBaseUrl$path"
+    info(s"url:$url")
+    val uri: URI = new URI(url)
+    val request = new HttpPost(uri)
+    request.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON))
+    doRequest(request)
+  }
+
+  def doRequest(request: HttpRequestBase): (Boolean, String) = {
     request.addHeader("Content-Type", "application/json")
     request.addHeader("Charset", "UTF-8")
     val client = HttpClientBuilder.create().build()
     val response: CloseableHttpResponse = client.execute(request)
     val handler = new BasicResponseHandler()
     (true, handler.handleResponse(response).trim)
-
   }
 
   def parseString(data: String): JsonNode = {