[GRIFFIN-322] Add SQL mode for ES connector
As [GRIFFIN-322](https://issues.apache.org/jira/projects/GRIFFIN/issues/GRIFFIN-322?filter=allopenissues) , we want add sql mode for es connector.
**The sql mode would more effective and user-friendly.**
Current mode config:
{ "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
"index": "index-xxx",
"type": "metric",
"host": "xxxxxxxxxx",
"port": "xxxx",
"fields": ["col_a", "col_b", "col_c"],
"size": 100}
SQL mode config:
{ "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
"sql.mode": true,
"host": "xxxxxxxxxx",
"port": "xxxx",
"sql": "select col_a, col_b, col_c from index-xx limit 100"}
Compared with current mode, SQL mode could support other types except number type.
Author: yuxiaoyu <yuxiaoyu@bytedance.com>
Closes #567 from XiaoyuBD/enrichEsConnectorAddSqlMode.
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index d1a6c36..4d7594c 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -248,6 +248,62 @@
**Note:** User-provided data connector should be present in Spark job's class path, by either providing custom jar with
`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.
+ ##### For ElasticSearch Custom Data Connectors:
+ - Currently supported SQL mode (for ElasticSearch with sql plugin) and NORMAL mode.
+ - For NORMAL mode, config object supports the following keys,
+
+ | Name | Type | Description | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-------------- |
+ | index | `String` | ElasticSearch index name| default |
+ | type | `String` | ElasticSearch data type | accuracy |
+ | host | `String` | ElasticSearch url host | `Empty` |
+ | port | `String` | ElasticSearch url port | `Empty` |
+ | fields | `List` | list of columns | `Empty` |
+ | size | `Integer`| data size (lines) to load | 100 |
+ | metric.name| `String` | metric name to load | * |
+
+ - Example:
+ ```
+ "connectors": [
+ {
+ "type": "custom",
+ "config": {
+ "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
+ "index": "test-index-v1",
+ "type": "metric",
+ "host": "test.es-xxx.org",
+ "port": "80",
+ "fields": ["col_a", "col_b", "col_c"],
+ "size": 20
+ }
+ }
+ ]
+ ```
+ - For SQL mode, config object supports the following keys,
+
+ | Name | Type | Description | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-------------- |
+ | host | `String` | ElasticSearch url host | `Empty` |
+ | port | `String` | ElasticSearch url port | `Empty` |
+ | sql.mode | `Boolean`| use sql mode | false |
+ | sql | `String` | ElasticSearch SQL | `Empty` |
+
+ - Example:
+ ```
+ "connectors": [
+ {
+ "type": "custom",
+ "config": {
+ "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
+ "host": "test.es-xxx.org",
+ "port": "80",
+ "sql.mode": true,
+ "sql": "select col_a, col_b, col_c from test-index-v1 limit 20"
+ }
+ }
+ ]
+ ```
+
##### For File based Data Connectors:
- Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index b70ba51..93ccb79 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -26,10 +26,11 @@
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpRequestBase}
+import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -53,6 +54,8 @@
val Fields = "fields"
val Size = "size"
val MetricName = "metric.name"
+ val Sql = "sql"
+ val SqlMode = "sql.mode"
val index: String = config.getString(Index, "default")
val dataType: String = config.getString(Type, "accuracy")
val metricName: String = config.getString(MetricName, "*")
@@ -60,15 +63,44 @@
val version: String = config.getString(EsVersion, "")
val port: String = config.getString(Port, "")
val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
+ val sql: String = config.getString(Sql, "")
+ val sqlMode: Boolean = config.getBoolean(SqlMode, false)
val size: Int = config.getInt(Size, 100)
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+ if (sqlMode) dataBySql(ms) else dataBySearch(ms)
+ }
+ def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
+ val path: String = s"/_sql?format=csv"
+ info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
+ val dfOpt = try {
+ val answer = httpPost(path, sql)
+ if (answer._1) {
+ import sparkSession.implicits._
+ val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+ val reader: DataFrameReader = sparkSession.read
+ reader.option("header", true).option("inferSchema", true)
+ val df: DataFrame = reader.csv(rdd.toDS())
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } else None
+ } catch {
+ case e: Throwable =>
+ error(s"load ES by sql $host:$port $sql fails: ${e.getMessage}", e)
+ None
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+ def dataBySearch(ms: Long): (Option[DataFrame], TimeRange) = {
val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
val dfOpt = try {
- val answer = httpRequest(path)
+ val answer = httpGet(path)
val data = ArrayBuffer[Map[String, Number]]()
if (answer._1) {
@@ -109,19 +141,30 @@
(dfOpt, TimeRange(ms, tmsts))
}
- def httpRequest(path: String): (Boolean, String) = {
-
+ def httpGet(path: String): (Boolean, String) = {
val url: String = s"$getBaseUrl$path"
info(s"url:$url")
val uri: URI = new URI(url)
- val request = new org.apache.http.client.methods.HttpGet(uri)
+ val request = new HttpGet(uri)
+ doRequest(request)
+ }
+
+ def httpPost(path: String, body: String): (Boolean, String) = {
+ val url: String = s"$getBaseUrl$path"
+ info(s"url:$url")
+ val uri: URI = new URI(url)
+ val request = new HttpPost(uri)
+ request.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON))
+ doRequest(request)
+ }
+
+ def doRequest(request: HttpRequestBase): (Boolean, String) = {
request.addHeader("Content-Type", "application/json")
request.addHeader("Charset", "UTF-8")
val client = HttpClientBuilder.create().build()
val response: CloseableHttpResponse = client.execute(request)
val handler = new BasicResponseHandler()
(true, handler.handleResponse(response).trim)
-
}
def parseString(data: String): JsonNode = {