Merge branch 'master' into GRIFFIN-AGORSHKOV
diff --git a/measure/pom.xml b/measure/pom.xml
index b9d93b2..3facd08 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -46,6 +46,9 @@
<curator.version>2.10.0</curator.version>
<scalamock.version>3.6.0</scalamock.version>
<spark.testing.version>0.9.0</spark.testing.version>
+ <mysql.java.version>5.1.47</mysql.java.version>
+ <commons.httpclient.version>3.1</commons.httpclient.version>
+ <cassandra.connector.version>2.4.1</cassandra.connector.version>
</properties>
<dependencies>
@@ -178,14 +181,26 @@
<version>${spark.version}_${spark.testing.version}</version>
<scope>test</scope>
</dependency>
-
-
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.spark</groupId>
+ <artifactId>spark-cassandra-connector_2.11</artifactId>
+ <version>${cassandra.connector.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>${commons.httpclient.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
</dependency>
-
</dependencies>
<build>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
new file mode 100644
index 0000000..87aebc1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
@@ -0,0 +1,93 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class CassandraDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage) extends BatchDataConnector {
+
+ val config: Map[String, Any] = dcParam.getConfig
+
+ val Database = "database"
+ val TableName = "table.name"
+ val Where = "where"
+ val Host = "host"
+ val Port = "port"
+ val User = "user"
+ val Password = "password"
+
+ val database: String = config.getString(Database, "default")
+ val tableName: String = config.getString(TableName, "")
+ val whereString: String = config.getString(Where, "")
+
+ val host: String = config.getString(Host, "localhost")
+ val port: Int = config.getInt(Port, 9042)
+ val user: String = config.getString(User, "")
+ val password: String = config.getString(Password, "")
+ val wheres: Array[String] = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+
+ override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+
+ val dfOpt = try {
+ sparkSession.conf.set("spark.cassandra.connection.host", host)
+ sparkSession.conf.set("spark.cassandra.connection.port", port)
+ sparkSession.conf.set("spark.cassandra.auth.username", user)
+ sparkSession.conf.set("spark.cassandra.auth.password", password)
+
+
+ val tableDef: DataFrameReader = sparkSession.read
+ .format("org.apache.spark.sql.cassandra")
+ .options(Map("table" -> tableName, "keyspace" -> database))
+
+ val dataWh: String = dataWhere()
+
+ var data: DataFrame = null
+ if (wheres.length > 0) {
+ data = tableDef.load().where(dataWh)
+ } else {
+ data = tableDef.load()
+ }
+
+ val dfOpt = Some(data)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
+ None
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+ private def dataWhere(): String = {
+ if (wheres.length > 0) {
+ wheres.mkString(" OR ")
+ } else null
+ }
+
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
new file mode 100644
index 0000000..6f1048b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -0,0 +1,143 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
+import java.net.URI
+import java.util
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+
+case class ElasticSearchGriffinDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage) extends BatchDataConnector {
+
+ lazy val getBaseUrl = s"http://$host:$port"
+ val config: scala.Predef.Map[scala.Predef.String, scala.Any] = dcParam.getConfig
+ val Index = "index"
+ val Type = "type"
+ val Host = "host"
+ val Port = "port"
+ val EsVersion = "version"
+ val Fields = "fields"
+ val Size = "size"
+ val MetricName = "metric.name"
+ val index: String = config.getString(Index, "default")
+ val dataType: String = config.getString(Type, "accuracy")
+ val metricName: String = config.getString(MetricName, "*")
+ val host: String = config.getString(Host, "")
+ val version: String = config.getString(EsVersion, "")
+ val port: String = config.getString(Port, "")
+ val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
+ val size: Int = config.getInt(Size, 100)
+
+ override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+
+ val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
+ info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
+
+ val dfOpt = try {
+ val answer = httpRequest(path)
+ val data = ArrayBuffer[Map[String, Number]]()
+
+ if (answer._1) {
+ val arrayAnswers: util.Iterator[JsonNode] = parseString(answer._2).get("hits").get("hits").elements()
+
+ while (arrayAnswers.hasNext) {
+ val answer = arrayAnswers.next()
+ val values = answer.get("_source").get("value")
+ val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
+ val fieldsMap = mutable.Map[String, Number]()
+ while (fields.hasNext) {
+ val fld: util.Map.Entry[String, JsonNode] = fields.next()
+ fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+ }
+ data += fieldsMap.toMap
+ }
+ }
+ val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
+ val columns: Array[String] = fields.toArray
+ val defaultNumber: Number = 0.0
+ val rdd: RDD[Row] = rdd1
+ .map { x: Map[String, Number] =>
+ Row(
+ columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
+ }
+ val schema = dfSchema(columns.toList)
+ val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
+ df.show(20)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
+ None
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+ def httpRequest(path: String): (Boolean, String) = {
+
+ val url: String = s"$getBaseUrl$path"
+ info(s"url:$url")
+ val uri: URI = new URI(url)
+ val request = new org.apache.http.client.methods.HttpGet(uri)
+ request.addHeader("Content-Type", "application/json")
+ request.addHeader("Charset", "UTF-8")
+ val client = HttpClientBuilder.create().build()
+ val response: CloseableHttpResponse = client.execute(request)
+ val handler = new BasicResponseHandler()
+ (true, handler.handleResponse(response).trim)
+
+ }
+
+ def parseString(data: String): JsonNode = {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+ val reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data.getBytes)))
+ mapper.readTree(reader)
+ }
+
+ def dfSchema(columnNames: List[String]): StructType = {
+ val a: Seq[StructField] = columnNames
+ .map(x => StructField(name = x,
+ dataType = org.apache.spark.sql.types.DoubleType,
+ nullable = false))
+ StructType(a)
+ }
+
+}
\ No newline at end of file
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
new file mode 100644
index 0000000..a893027
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class MySqlDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage) extends BatchDataConnector {
+
+ val Database = "database"
+ val TableName = "table.name"
+ val Where = "where"
+ val Url = "url"
+ val User = "user"
+ val Password = "password"
+ val Driver = "driver"
+
+ val database: String = dcParam.getConfig.getString(Database, "default")
+ val tableName: String = dcParam.getConfig.getString(TableName, "")
+ val fullTableName: String = s"$database.$tableName"
+ val whereString: String = dcParam.getConfig.getString(Where, "")
+ val url: String = dcParam.getConfig.getString(Url, "")
+ val user: String = dcParam.getConfig.getString(User, "")
+ val password: String = dcParam.getConfig.getString(Password, "")
+ val driver: String = dcParam.getConfig.getString(Driver, "com.mysql.jdbc.Driver")
+
+ override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+
+
+ val dfOpt = try {
+ val dtSql = dataSql()
+ val prop = new java.util.Properties
+ prop.setProperty("user", user)
+ prop.setProperty("password", password)
+ prop.setProperty("driver", driver)
+ val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
+ None
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+ private def dataSql(): String = {
+
+ val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+ val tableClause = s"SELECT * FROM $fullTableName"
+ if (wheres.length > 0) {
+ val clauses = wheres.map { w =>
+ s"$tableClause WHERE $w"
+ }
+ clauses.mkString(" UNION ALL ")
+ } else tableClause
+ }
+}
\ No newline at end of file