blob: 6698726a262bd79f625924ca27b109e6a8f2f575 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
import java.net.URI
import java.util.{Iterator => JavaIterator, Map => JavaMap}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpRequestBase}
import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._
@deprecated(
s"This class is deprecated. Use '${classOf[ElasticSearchDataConnector].getCanonicalName}'.",
"0.6.0")
case class ElasticSearchGriffinDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage)
extends BatchDataConnector {
lazy val getBaseUrl = s"http://$host:$port"
val config: scala.Predef.Map[scala.Predef.String, scala.Any] = dcParam.getConfig
val Index = "index"
val Type = "type"
val Host = "host"
val Port = "port"
val EsVersion = "version"
val Fields = "fields"
val Size = "size"
val MetricName = "metric.name"
val Sql = "sql"
val SqlMode = "sql.mode"
val index: String = config.getString(Index, "default")
val dataType: String = config.getString(Type, "accuracy")
val metricName: String = config.getString(MetricName, "*")
val host: String = config.getString(Host, "")
val version: String = config.getString(EsVersion, "")
val port: String = config.getString(Port, "")
val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
val sql: String = config.getString(Sql, "")
val sqlMode: Boolean = config.getBoolean(SqlMode, defValue = false)
val size: Int = config.getInt(Size, 100)
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
if (sqlMode) dataBySql(ms) else dataBySearch(ms)
}
def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
val path: String = "/_sql?format=csv"
info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
val dfOpt =
try {
val answer = httpPost(path, sql)
if (answer._1) {
import sparkSession.implicits._
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
val reader: DataFrameReader = sparkSession.read
reader.option("header", value = true).option("inferSchema", value = true)
val df: DataFrame = reader.csv(rdd.toDS())
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} else None
} catch {
case e: Throwable =>
error(s"load ES by sql $host:$port $sql fails: ${e.getMessage}", e)
None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
def dataBySearch(ms: Long): (Option[DataFrame], TimeRange) = {
val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
val dfOpt =
try {
val answer = httpGet(path)
val data = ArrayBuffer[Map[String, Number]]()
if (answer._1) {
val arrayAnswers: JavaIterator[JsonNode] =
parseString(answer._2).get("hits").get("hits").elements()
while (arrayAnswers.hasNext) {
val answer = arrayAnswers.next()
val values = answer.get("_source").get("value")
val fields: JavaIterator[JavaMap.Entry[String, JsonNode]] = values.fields()
val fieldsMap = mutable.Map[String, Number]()
while (fields.hasNext) {
val fld: JavaMap.Entry[String, JsonNode] = fields.next()
fieldsMap.put(fld.getKey, fld.getValue.numberValue())
}
data += fieldsMap.toMap
}
}
val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
val columns: Array[String] = fields.toArray
val defaultNumber: Number = 0.0
val rdd: RDD[Row] = rdd1
.map { x: Map[String, Number] =>
Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
}
val schema = dfSchema(columns.toList)
val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
case e: Throwable =>
error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
def httpGet(path: String): (Boolean, String) = {
val url: String = s"$getBaseUrl$path"
info(s"url:$url")
val uri: URI = new URI(url)
val request = new HttpGet(uri)
doRequest(request)
}
def httpPost(path: String, body: String): (Boolean, String) = {
val url: String = s"$getBaseUrl$path"
info(s"url:$url")
val uri: URI = new URI(url)
val request = new HttpPost(uri)
request.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON))
doRequest(request)
}
def doRequest(request: HttpRequestBase): (Boolean, String) = {
request.addHeader("Content-Type", "application/json")
request.addHeader("Charset", "UTF-8")
val client = HttpClientBuilder.create().build()
val response: CloseableHttpResponse = client.execute(request)
val handler = new BasicResponseHandler()
(true, handler.handleResponse(response).trim)
}
def parseString(data: String): JsonNode = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val reader = new BufferedReader(
new InputStreamReader(new ByteArrayInputStream(data.getBytes)))
mapper.readTree(reader)
}
def dfSchema(columnNames: List[String]): StructType = {
val a: Seq[StructField] = columnNames
.map(x =>
StructField(name = x, dataType = org.apache.spark.sql.types.DoubleType, nullable = false))
StructType(a)
}
}