Merge pull request #530 from aleksgor/GRIFFIN-AGORSHKOV

Add Mysql, Cassandra and Elasticsearch connectors.
diff --git a/measure/pom.xml b/measure/pom.xml
index b9d93b2..d988fa3 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -46,6 +46,8 @@
         <curator.version>2.10.0</curator.version>
         <scalamock.version>3.6.0</scalamock.version>
         <spark.testing.version>0.9.0</spark.testing.version>
+        <mysql.java.version>5.1.47</mysql.java.version>
+        <cassandra.connector.version>2.4.1</cassandra.connector.version>
     </properties>
 
     <dependencies>
@@ -178,14 +180,21 @@
             <version>${spark.version}_${spark.testing.version}</version>
             <scope>test</scope>
         </dependency>
-
-
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.java.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.spark</groupId>
+            <artifactId>spark-cassandra-connector_2.11</artifactId>
+            <version>${cassandra.connector.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
             <version>4.5.9</version>
         </dependency>
-
     </dependencies>
 
     <build>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
new file mode 100644
index 0000000..87aebc1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
@@ -0,0 +1,93 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class CassandraDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  timestampStorage: TimestampStorage) extends BatchDataConnector {
+
+  val config: Map[String, Any] = dcParam.getConfig
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Where = "where"
+  val Host = "host"
+  val Port = "port"
+  val User = "user"
+  val Password = "password"
+
+  val database: String = config.getString(Database, "default")
+  val tableName: String = config.getString(TableName, "")
+  val whereString: String = config.getString(Where, "")
+
+  val host: String = config.getString(Host, "localhost")
+  val port: Int = config.getInt(Port, 9042)
+  val user: String = config.getString(User, "")
+  val password: String = config.getString(Password, "")
+  val wheres: Array[String] = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+
+  override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+
+    val dfOpt = try {
+      sparkSession.conf.set("spark.cassandra.connection.host", host)
+      sparkSession.conf.set("spark.cassandra.connection.port", port)
+      sparkSession.conf.set("spark.cassandra.auth.username", user)
+      sparkSession.conf.set("spark.cassandra.auth.password", password)
+
+
+      val tableDef: DataFrameReader = sparkSession.read
+        .format("org.apache.spark.sql.cassandra")
+        .options(Map("table" -> tableName, "keyspace" -> database))
+
+      val dataWh: String = dataWhere()
+
+      var data: DataFrame = null
+      if (wheres.length > 0) {
+        data = tableDef.load().where(dataWh)
+      } else {
+        data = tableDef.load()
+      }
+
+      val dfOpt = Some(data)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable =>
+        error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
+        None
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  private def dataWhere(): String = {
+    if (wheres.length > 0) {
+      wheres.mkString(" OR ")
+    } else null
+  }
+
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
new file mode 100644
index 0000000..6f1048b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -0,0 +1,143 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
+import java.net.URI
+import java.util
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+
+case class ElasticSearchGriffinDataConnector(@transient sparkSession: SparkSession,
+                                             dcParam: DataConnectorParam,
+                                             timestampStorage: TimestampStorage) extends BatchDataConnector {
+
+  lazy val getBaseUrl = s"http://$host:$port"
+  val config: scala.Predef.Map[scala.Predef.String, scala.Any] = dcParam.getConfig
+  val Index = "index"
+  val Type = "type"
+  val Host = "host"
+  val Port = "port"
+  val EsVersion = "version"
+  val Fields = "fields"
+  val Size = "size"
+  val MetricName = "metric.name"
+  val index: String = config.getString(Index, "default")
+  val dataType: String = config.getString(Type, "accuracy")
+  val metricName: String = config.getString(MetricName, "*")
+  val host: String = config.getString(Host, "")
+  val version: String = config.getString(EsVersion, "")
+  val port: String = config.getString(Port, "")
+  val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
+  val size: Int = config.getInt(Size, 100)
+
+  override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+
+    val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
+    info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
+
+    val dfOpt = try {
+      val answer = httpRequest(path)
+      val data = ArrayBuffer[Map[String, Number]]()
+
+      if (answer._1) {
+        val arrayAnswers: util.Iterator[JsonNode] = parseString(answer._2).get("hits").get("hits").elements()
+
+        while (arrayAnswers.hasNext) {
+          val answer = arrayAnswers.next()
+          val values = answer.get("_source").get("value")
+          val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
+          val fieldsMap = mutable.Map[String, Number]()
+          while (fields.hasNext) {
+            val fld: util.Map.Entry[String, JsonNode] = fields.next()
+            fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+          }
+          data += fieldsMap.toMap
+        }
+      }
+      val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
+      val columns: Array[String] = fields.toArray
+      val defaultNumber: Number = 0.0
+      val rdd: RDD[Row] = rdd1
+        .map { x: Map[String, Number] =>
+          Row(
+            columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
+        }
+      val schema = dfSchema(columns.toList)
+      val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
+      df.show(20)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable =>
+        error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
+        None
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  def httpRequest(path: String): (Boolean, String) = {
+
+    val url: String = s"$getBaseUrl$path"
+    info(s"url:$url")
+    val uri: URI = new URI(url)
+    val request = new org.apache.http.client.methods.HttpGet(uri)
+    request.addHeader("Content-Type", "application/json")
+    request.addHeader("Charset", "UTF-8")
+    val client = HttpClientBuilder.create().build()
+    val response: CloseableHttpResponse = client.execute(request)
+    val handler = new BasicResponseHandler()
+    (true, handler.handleResponse(response).trim)
+
+  }
+
+  def parseString(data: String): JsonNode = {
+    val mapper = new ObjectMapper()
+    mapper.registerModule(DefaultScalaModule)
+    val reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data.getBytes)))
+    mapper.readTree(reader)
+  }
+
+  def dfSchema(columnNames: List[String]): StructType = {
+    val a: Seq[StructField] = columnNames
+      .map(x => StructField(name = x,
+        dataType = org.apache.spark.sql.types.DoubleType,
+        nullable = false))
+    StructType(a)
+  }
+
+}
\ No newline at end of file
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
new file mode 100644
index 0000000..a893027
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class MySqlDataConnector(@transient sparkSession: SparkSession,
+                              dcParam: DataConnectorParam,
+                              timestampStorage: TimestampStorage) extends BatchDataConnector {
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Where = "where"
+  val Url = "url"
+  val User = "user"
+  val Password = "password"
+  val Driver = "driver"
+
+  val database: String = dcParam.getConfig.getString(Database, "default")
+  val tableName: String = dcParam.getConfig.getString(TableName, "")
+  val fullTableName: String = s"$database.$tableName"
+  val whereString: String = dcParam.getConfig.getString(Where, "")
+  val url: String = dcParam.getConfig.getString(Url, "")
+  val user: String = dcParam.getConfig.getString(User, "")
+  val password: String = dcParam.getConfig.getString(Password, "")
+  val driver: String = dcParam.getConfig.getString(Driver, "com.mysql.jdbc.Driver")
+
+  override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+
+
+    val dfOpt = try {
+      val dtSql = dataSql()
+      val prop = new java.util.Properties
+      prop.setProperty("user", user)
+      prop.setProperty("password", password)
+      prop.setProperty("driver", driver)
+      val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable =>
+        error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
+        None
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  private def dataSql(): String = {
+
+    val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+    val tableClause = s"SELECT * FROM $fullTableName"
+    if (wheres.length > 0) {
+      val clauses = wheres.map { w =>
+        s"$tableClause WHERE $w"
+      }
+      clauses.mkString(" UNION ALL ")
+    } else tableClause
+  }
+}
\ No newline at end of file