blob: 77040e32ddf1da1233017c6bf4b34747e7259b43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
import java.sql.DriverManager
import java.util.Properties
import org.scalatest.Matchers
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
class JDBCBasedDataConnectorTest extends SparkSuiteBase with Matchers {
val url = "jdbc:h2:mem:test"
var conn: java.sql.Connection = _
val properties = new Properties()
properties.setProperty("user", "user")
properties.setProperty("password", "password")
properties.setProperty("rowId", "false")
override def beforeAll(): Unit = {
super.beforeAll()
DriverManager.registerDriver(new org.h2.Driver)
Class.forName("org.h2.Driver", false, this.getClass.getClassLoader)
conn = DriverManager.getConnection(url, properties)
conn.prepareStatement("create schema griffin").executeUpdate()
conn.prepareStatement("drop table if exists griffin.employee").executeUpdate()
conn
.prepareStatement(
"create table griffin.employee (name VARCHAR NOT NULL, id INTEGER NOT NULL)")
.executeUpdate()
conn.prepareStatement("insert into griffin.employee values ('emp1', 1)").executeUpdate()
conn.prepareStatement("insert into griffin.employee values ('emp2', 2)").executeUpdate()
conn.commit()
}
private final val dcParam =
DataConnectorParam("jdbc", "test_df", Map.empty[String, String], Nil)
private final val timestampStorage = TimestampStorage()
"JDBC based data connector" should "be able to read data from relational database" in {
val configs = Map(
"database" -> "griffin",
"tablename" -> "employee",
"url" -> url,
"user" -> "user",
"password" -> "password",
"driver" -> "org.h2.Driver")
val dc = JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
val result = dc.data(1000L)
assert(result._1.isDefined)
assert(result._1.get.collect().length == 2)
}
"JDBC based data connector" should "be able to read data from relational database with where condition" in {
val configs = Map(
"database" -> "griffin",
"tablename" -> "employee",
"url" -> url,
"user" -> "user",
"password" -> "password",
"driver" -> "org.h2.Driver",
"where" -> "id=1 and name='emp1'")
val dc = JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
val result = dc.data(1000L)
assert(result._1.isDefined)
assert(result._1.get.collect().length == 1)
}
"JDBC data connector" should "have URL field in config" in {
the[java.lang.IllegalArgumentException] thrownBy {
val configs = Map(
"database" -> "griffin",
"tablename" -> "employee",
"user" -> "user",
"password" -> "password",
"driver" -> "org.h2.Driver")
JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
} should have message "requirement failed: JDBC connection: connection url is mandatory"
}
"JDBC data connector" should "have table name field in config" in {
the[java.lang.IllegalArgumentException] thrownBy {
val configs = Map(
"database" -> "griffin",
"url" -> url,
"user" -> "user",
"password" -> "password",
"driver" -> "org.h2.Driver")
JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
} should have message "requirement failed: JDBC connection: table is mandatory"
}
"JDBC data connector" should "have user name field in config" in {
the[java.lang.IllegalArgumentException] thrownBy {
val configs = Map(
"database" -> "griffin",
"url" -> url,
"tablename" -> "employee",
"password" -> "password",
"driver" -> "org.h2.Driver")
JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
} should have message "requirement failed: JDBC connection: user name is mandatory"
}
"JDBC data connector" should "have table password field in config" in {
the[java.lang.IllegalArgumentException] thrownBy {
val configs = Map(
"database" -> "griffin",
"url" -> url,
"tablename" -> "employee",
"user" -> "user",
"driver" -> "org.h2.Driver")
JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
} should have message "requirement failed: JDBC connection: password is mandatory"
}
"JDBC data connector" should "have driver provided in config in classpath" in {
the[AssertionError] thrownBy {
val configs = Map(
"database" -> "griffin",
"url" -> url,
"tablename" -> "employee",
"user" -> "user",
"password" -> "password",
"driver" -> "org.postgresql.Driver")
JDBCBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
}
}
override def afterAll(): Unit = {
super.afterAll()
conn.close()
}
}