blob: 66a30c1ffcececa13ee97b1f74c61a2348ba831a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.scalatest._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.step.builder.ConstantColumns
class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
override def beforeAll(): Unit = {
super.beforeAll()
createDataSets(s"file://${getClass.getResource("/").getPath}")
}
private def createDataSets(basePath: String): Unit = {
val formats = Seq("parquet", "orc", "csv", "tsv")
val schema = new StructType().add("name", StringType).add("age", IntegerType, nullable = true)
val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv")
df.cache()
formats.foreach(f => {
val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv")) "\t" else ""
df.write
.mode(SaveMode.Overwrite)
.option("delimiter", delimiter)
.option("header", "true")
.format(if (f.matches("tsv")) "csv" else f)
.save(s"${basePath}files/person_table.$f")
})
df.unpersist()
}
private final val dcParam = DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil)
private final val timestampStorage = TimestampStorage()
// Regarding Local FileSystem
"file based data connector" should "be able to read from local filesystem" in {
val configs = Map(
"format" -> "csv",
"paths" -> Seq(
s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
),
"options" -> Map(
"header" -> "false"
)
)
val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
val result = dc.data(1000L)
assert(result._1.isDefined)
assert(result._1.get.collect().length == 2)
}
// Regarding User Defined Schema
it should "respect the provided schema, if any" in {
val configs = Map(
"format" -> "csv",
"paths" -> Seq(
s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
)
)
// no schema
assertThrows[IllegalArgumentException](
FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
)
// invalid schema
assertThrows[IllegalStateException](
FileBasedDataConnector(spark, dcParam.copy(config = configs + (("schema", ""))), timestampStorage)
)
// valid schema
val result1 = FileBasedDataConnector(spark,
dcParam.copy(config = configs + (("schema",
Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age", "type" -> "int", "nullable" -> "true"))
)))
, timestampStorage)
.data(1L)
val expSchema = new StructType()
.add("name", StringType)
.add("age", IntegerType, nullable = true)
.add(ConstantColumns.tmst, LongType, nullable = false)
assert(result1._1.isDefined)
assert(result1._1.get.collect().length == 2)
assert(result1._1.get.schema == expSchema)
// valid headers
val result2 = FileBasedDataConnector(spark,
dcParam.copy(config = configs + (("options", Map(
"header" -> "true"
)
)))
, timestampStorage)
.data(1L)
assert(result2._1.isDefined)
assert(result2._1.get.collect().length == 1)
result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14", ConstantColumns.tmst)
}
// skip on erroneous paths
it should "respect options if an erroneous path is encountered" in {
val configs = Map(
"format" -> "csv",
"paths" -> Seq(
s"file://${getClass.getResource("/hive/person_table.csv").getPath}",
s"${java.util.UUID.randomUUID().toString}/"
),
"skipErrorPaths" -> true,
"options" -> Map(
"header" -> "true"
)
)
// valid paths
val result1 = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
assert(result1._1.isDefined)
assert(result1._1.get.collect().length == 1)
// non existent path
assertThrows[IllegalArgumentException](
FileBasedDataConnector(spark, dcParam.copy(config = configs - "skipErrorPaths"), timestampStorage).data(1L)
)
// no path
assertThrows[AssertionError](
FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"), timestampStorage).data(1L)
)
}
// Regarding various formats
it should "be able to read all supported file types" in {
val formats = Seq("parquet", "orc", "csv", "tsv")
formats.map(f => {
val configs = Map(
"format" -> f,
"paths" -> Seq(
s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
),
"options" -> Map(
"header" -> "true",
"inferSchema" -> "true"
)
)
val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
assert(result._1.isDefined)
val df = result._1.get
val expSchema = new StructType()
.add("name", StringType)
.add("age", IntegerType, nullable = true)
.add(ConstantColumns.tmst, LongType, nullable = false)
assert(df.collect().length == 2)
assert(df.schema == expSchema)
})
}
it should "apply schema to all formats if provided" in {
val formats = Seq("parquet", "orc", "csv", "tsv")
formats.map(f => {
val configs = Map(
"format" -> f,
"paths" -> Seq(
s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
),
"options" -> Map(
"header" -> "true"
),
"schema" -> Seq(Map("name" -> "name", "type" -> "string"))
)
val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
assert(result._1.isDefined)
val df = result._1.get
val expSchema = new StructType()
.add("name", StringType)
.add(ConstantColumns.tmst, LongType, nullable = false)
assert(df.collect().length == 2)
assert(df.schema == expSchema)
})
}
}