| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package org.apache.griffin.measure.datasource.connector.batch |
| |
| import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader} |
| import java.net.URI |
| import java.util |
| |
| import scala.collection.mutable |
| import scala.collection.mutable.ArrayBuffer |
| |
| import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} |
| import com.fasterxml.jackson.module.scala.DefaultScalaModule |
| import org.apache.http.client.methods.CloseableHttpResponse |
| import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder} |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.sql.{DataFrame, Row, SparkSession} |
| import org.apache.spark.sql.types.{StructField, StructType} |
| |
| import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam |
| import org.apache.griffin.measure.context.TimeRange |
| import org.apache.griffin.measure.datasource.TimestampStorage |
| import org.apache.griffin.measure.utils.ParamUtil._ |
| |
| |
| case class ElasticSearchGriffinDataConnector(@transient sparkSession: SparkSession, |
| dcParam: DataConnectorParam, |
| timestampStorage: TimestampStorage) extends BatchDataConnector { |
| |
| lazy val getBaseUrl = s"http://$host:$port" |
| val config: scala.Predef.Map[scala.Predef.String, scala.Any] = dcParam.getConfig |
| val Index = "index" |
| val Type = "type" |
| val Host = "host" |
| val Port = "port" |
| val EsVersion = "version" |
| val Fields = "fields" |
| val Size = "size" |
| val MetricName = "metric.name" |
| val index: String = config.getString(Index, "default") |
| val dataType: String = config.getString(Type, "accuracy") |
| val metricName: String = config.getString(MetricName, "*") |
| val host: String = config.getString(Host, "") |
| val version: String = config.getString(EsVersion, "") |
| val port: String = config.getString(Port, "") |
| val fields: Seq[String] = config.getStringArr(Fields, Seq[String]()) |
| val size: Int = config.getInt(Size, 100) |
| |
| override def data(ms: Long): (Option[DataFrame], TimeRange) = { |
| |
| val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size" |
| info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path") |
| |
| val dfOpt = try { |
| val answer = httpRequest(path) |
| val data = ArrayBuffer[Map[String, Number]]() |
| |
| if (answer._1) { |
| val arrayAnswers: util.Iterator[JsonNode] = parseString(answer._2).get("hits").get("hits").elements() |
| |
| while (arrayAnswers.hasNext) { |
| val answer = arrayAnswers.next() |
| val values = answer.get("_source").get("value") |
| val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields() |
| val fieldsMap = mutable.Map[String, Number]() |
| while (fields.hasNext) { |
| val fld: util.Map.Entry[String, JsonNode] = fields.next() |
| fieldsMap.put(fld.getKey, fld.getValue.numberValue()) |
| } |
| data += fieldsMap.toMap |
| } |
| } |
| val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data) |
| val columns: Array[String] = fields.toArray |
| val defaultNumber: Number = 0.0 |
| val rdd: RDD[Row] = rdd1 |
| .map { x: Map[String, Number] => |
| Row( |
| columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*) |
| } |
| val schema = dfSchema(columns.toList) |
| val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size) |
| df.show(20) |
| val dfOpt = Some(df) |
| val preDfOpt = preProcess(dfOpt, ms) |
| preDfOpt |
| } catch { |
| case e: Throwable => |
| error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e) |
| None |
| } |
| val tmsts = readTmst(ms) |
| (dfOpt, TimeRange(ms, tmsts)) |
| } |
| |
| def httpRequest(path: String): (Boolean, String) = { |
| |
| val url: String = s"$getBaseUrl$path" |
| info(s"url:$url") |
| val uri: URI = new URI(url) |
| val request = new org.apache.http.client.methods.HttpGet(uri) |
| request.addHeader("Content-Type", "application/json") |
| request.addHeader("Charset", "UTF-8") |
| val client = HttpClientBuilder.create().build() |
| val response: CloseableHttpResponse = client.execute(request) |
| val handler = new BasicResponseHandler() |
| (true, handler.handleResponse(response).trim) |
| |
| } |
| |
| def parseString(data: String): JsonNode = { |
| val mapper = new ObjectMapper() |
| mapper.registerModule(DefaultScalaModule) |
| val reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data.getBytes))) |
| mapper.readTree(reader) |
| } |
| |
| def dfSchema(columnNames: List[String]): StructType = { |
| val a: Seq[StructField] = columnNames |
| .map(x => StructField(name = x, |
| dataType = org.apache.spark.sql.types.DoubleType, |
| nullable = false)) |
| StructType(a) |
| } |
| |
| } |