[GRIFFIN-315] Adding JDBC based data connector

**What changes were proposed in this pull request?**

JDBC based data connector to read data from different JDBC based data sources.

**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Griffin test suite.

Author: tusharpatil20 <tushargp20@gmail.com>

Closes #561 from tusharpatil20/JDBCBased-source-connector.
diff --git a/measure/pom.xml b/measure/pom.xml
index ce198bb..be98435 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -201,6 +201,12 @@
             <artifactId>httpclient</artifactId>
             <version>4.5.9</version>
         </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.200</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 2578470..deb9bd8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -40,6 +40,7 @@
   val KafkaRegex: Regex = """^(?i)kafka$""".r
 
   val CustomRegex: Regex = """^(?i)custom$""".r
+  val JDBCRegex: Regex = """^(?i)jdbc$""".r
 
   /**
    * create data connector
@@ -72,6 +73,7 @@
             dcParam,
             tmstCache,
             streamingCacheClientOpt)
+        case JDBCRegex() => JDBCBasedDataConnector(sparkSession, dcParam, tmstCache)
         case _ => throw new Exception("connector creation error!")
       }
     }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
new file mode 100644
index 0000000..e7efa6e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * A batch data connector for JDBC based source which allows support for various
+ * JDBC based data sources like Oracle. Postgres etc.
+ *
+ * Supported Configurations:
+ *  - database : [[String]] specifying the database name.
+ *  - tablename : [[String]] specifying the table name to be read
+ *  - url : [[String]] specifying the URL to connect to database
+ *  - user : [[String]] specifying the user for connection to database
+ *  - password: [[String]] specifying the password for connection to database
+ *  - driver : [[String]] specifying the driver for JDBC connection to database
+ *  - where : [[String]] specifying the condition for reading data from table
+ *
+ * Some defaults assumed by this connector (if not set) are as follows:
+ *  - `database` is default,
+ *  - `driver` is com.mysql.jdbc.Driver,
+ *  - `where` is None
+ */
+case class JDBCBasedDataConnector(
+    @transient sparkSession: SparkSession,
+    dcParam: DataConnectorParam,
+    timestampStorage: TimestampStorage)
+    extends BatchDataConnector {
+
+  import JDBCBasedDataConnector._
+  val config: Map[String, Any] = dcParam.getConfig
+  val database: String = config.getString(Database, DefaultDatabase)
+  val tableName: String = config.getString(TableName, EmptyString)
+  val fullTableName: String = s"$database.$tableName"
+  val whereString: String = config.getString(Where, EmptyString)
+  val url: String = config.getString(Url, EmptyString)
+  val user: String = config.getString(User, EmptyString)
+  val password: String = config.getString(Password, EmptyString)
+  val driver: String = config.getString(Driver, DefaultDriver)
+
+  require(url.nonEmpty, "JDBC connection: connection url is mandatory")
+  require(user.nonEmpty, "JDBC connection: user name is mandatory")
+  require(password.nonEmpty, "JDBC connection: password is mandatory")
+  require(tableName.nonEmpty, "JDBC connection: table is mandatory")
+  assert(isJDBCDriverLoaded(driver), s"JDBC driver ${driver} not present in classpath")
+
+  override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val dtSql = createSqlStmt()
+      val prop = new java.util.Properties
+      prop.setProperty("user", user)
+      prop.setProperty("password", password)
+      prop.setProperty("driver", driver)
+      val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable =>
+        error(s"loading table $fullTableName fails: ${e.getMessage}", e)
+        None
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  /**
+   * @return Return SQL statement with where condition if provided
+   */
+  private def createSqlStmt(): String = {
+    val tableClause = s"SELECT * FROM $fullTableName"
+    if (whereString.length > 0) {
+      s"$tableClause WHERE $whereString"
+    } else tableClause
+  }
+}
+
+object JDBCBasedDataConnector extends Loggable {
+  private val Database: String = "database"
+  private val TableName: String = "tablename"
+  private val Where: String = "where"
+  private val Url: String = "url"
+  private val User: String = "user"
+  private val Password: String = "password"
+  private val Driver: String = "driver"
+
+  private val DefaultDriver = "com.mysql.jdbc.Driver"
+  private val DefaultDatabase = "default"
+  private val EmptyString = ""
+
+  /**
+   * @param driver JDBC driver class name
+   * @return True if JDBC driver present in classpath
+   */
+  private def isJDBCDriverLoaded(driver: String): Boolean = {
+    try {
+      Class.forName(driver, false, this.getClass.getClassLoader)
+      true
+    } catch {
+      case x: ClassNotFoundException =>
+        griffinLogger.error(s"JDBC driver ${driver} provided is not found in class path")
+        false
+    }
+  }
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index 05d5d87..31502b2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -24,6 +24,9 @@
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.ParamUtil._
 
+@deprecated(
+  "This class is deprecated. Use 'org.apache.griffin.measure.datasource.connector.batch.JDBCBasedDataConnector'",
+  "0.6.0")
 case class MySqlDataConnector(
     @transient sparkSession: SparkSession,
     dcParam: DataConnectorParam,
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
new file mode 100644
index 0000000..e707ef7
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.griffin.measure.datasource.connector.batch
+
+import java.sql.DriverManager
+import java.util.Properties
+
+import org.scalatest.Matchers
+
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+
+class JDBCBasedDataConnectorTest extends SparkSuiteBase with Matchers {
+
+  val url = "jdbc:h2:mem:test"
+  var conn: java.sql.Connection = null
+  val properties = new Properties()
+  properties.setProperty("user", "user")
+  properties.setProperty("password", "password")
+  properties.setProperty("rowId", "false")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    Class.forName("org.h2.Driver", false, this.getClass.getClassLoader)
+    conn = DriverManager.getConnection(url, properties)
+    conn.prepareStatement("create schema griffin").executeUpdate()
+
+    conn.prepareStatement("drop table if exists griffin.employee").executeUpdate()
+    conn
+      .prepareStatement(
+        "create table griffin.employee (name VARCHAR NOT NULL, id INTEGER NOT NULL)")
+      .executeUpdate()
+    conn.prepareStatement("insert into griffin.employee values ('emp1', 1)").executeUpdate()
+    conn.prepareStatement("insert into griffin.employee values ('emp2', 2)").executeUpdate()
+    conn.commit()
+  }
+
+  private final val dcParam =
+    DataConnectorParam("jdbc", "1", "test_df", Map.empty[String, String], Nil)
+  private final val timestampStorage = TimestampStorage()
+
+  "JDBC based data connector" should "be able to read data from relational database" in {
+    val configs = Map(
+      "database" -> "griffin",
+      "tablename" -> "employee",
+      "url" -> url,
+      "user" -> "user",
+      "password" -> "password",
+      "driver" -> "org.h2.Driver")
+    val dc = JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    val result = dc.data(1000L)
+    assert(result._1.isDefined)
+    assert(result._1.get.collect().length == 2)
+  }
+
+  "JDBC based data connector" should "be able to read data from relational database with where condition" in {
+    val configs = Map(
+      "database" -> "griffin",
+      "tablename" -> "employee",
+      "url" -> url,
+      "user" -> "user",
+      "password" -> "password",
+      "driver" -> "org.h2.Driver",
+      "where" -> "id=1 and name='emp1'")
+    val dc = JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    val result = dc.data(1000L)
+    assert(result._1.isDefined)
+    assert(result._1.get.collect().length == 1)
+  }
+
+  "JDBC data connector" should "have URL field in config" in {
+    the[java.lang.IllegalArgumentException] thrownBy {
+      val configs = Map(
+        "database" -> "griffin",
+        "tablename" -> "employee",
+        "user" -> "user",
+        "password" -> "password",
+        "driver" -> "org.h2.Driver")
+      JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    } should have message "requirement failed: JDBC connection: connection url is mandatory"
+  }
+
+  "JDBC data connector" should "have table name field in config" in {
+    the[java.lang.IllegalArgumentException] thrownBy {
+      val configs = Map(
+        "database" -> "griffin",
+        "url" -> url,
+        "user" -> "user",
+        "password" -> "password",
+        "driver" -> "org.h2.Driver")
+      JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    } should have message "requirement failed: JDBC connection: table is mandatory"
+  }
+
+  "JDBC data connector" should "have user name field in config" in {
+    the[java.lang.IllegalArgumentException] thrownBy {
+      val configs = Map(
+        "database" -> "griffin",
+        "url" -> url,
+        "tablename" -> "employee",
+        "password" -> "password",
+        "driver" -> "org.h2.Driver")
+      JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    } should have message "requirement failed: JDBC connection: user name is mandatory"
+  }
+
+  "JDBC data connector" should "have table password field in config" in {
+    the[java.lang.IllegalArgumentException] thrownBy {
+      val configs = Map(
+        "database" -> "griffin",
+        "url" -> url,
+        "tablename" -> "employee",
+        "user" -> "user",
+        "driver" -> "org.h2.Driver")
+      JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    } should have message "requirement failed: JDBC connection: password is mandatory"
+  }
+
+  "JDBC data connector" should "have driver provided in config in classpath" in {
+    the[AssertionError] thrownBy {
+      val configs = Map(
+        "database" -> "griffin",
+        "url" -> url,
+        "tablename" -> "employee",
+        "user" -> "user",
+        "password" -> "password",
+        "driver" -> "org.postgresql.Driver")
+      JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    conn.close()
+  }
+}