blob: bf881bb479cda6474c9447927c2d48a1e59ab717 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.scalatest._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.step.builder.ConstantColumns
class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
override def beforeAll(): Unit = {
super.beforeAll()
createDataSets(s"file://${getClass.getResource("/").getPath}")
}
private def createDataSets(basePath: String): Unit = {
val formats = Seq("parquet", "orc", "csv", "tsv")
val schema = new StructType().add("name", StringType).add("age", IntegerType, nullable = true)
val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv")
df.cache()
formats.foreach(f => {
val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv")) "\t" else ""
df.write
.mode(SaveMode.Overwrite)
.option("delimiter", delimiter)
.option("header", "true")
.format(if (f.matches("tsv")) "csv" else f)
.save(s"${basePath}files/person_table.$f")
})
df.unpersist()
}
private final val dcParam = DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil)
private final val timestampStorage = TimestampStorage()
// Regarding Local FileSystem
"file based data connector" should "be able to read from local filesystem" in {
val configs = Map(
"format" -> "csv",
"paths" -> Seq(
s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
),
"options" -> Map(
"header" -> "false"
)
)
val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
val result = dc.data(1000L)
assert(result._1.isDefined)
assert(result._1.get.collect().length == 2)
}
// Regarding User Defined Schema
it should "respect the provided schema, if any" in {
val configs = Map(
"format" -> "csv",
"paths" -> Seq(
s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
)
)
// no schema
assertThrows[IllegalArgumentException](
FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
)
// invalid schema
assertThrows[IllegalStateException](
FileBasedDataConnector(spark, dcParam.copy(config = configs + (("schema", ""))), timestampStorage)
)
// valid schema
val result1 = FileBasedDataConnector(spark,
dcParam.copy(config = configs + (("schema",
Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age", "type" -> "int", "nullable" -> "true"))
)))
, timestampStorage)
.data(1L)
val expSchema = new StructType()
.add("name", StringType)
.add("age", IntegerType, nullable = true)
.add(ConstantColumns.tmst, LongType, nullable = false)
assert(result1._1.isDefined)
assert(result1._1.get.collect().length == 2)
assert(result1._1.get.schema == expSchema)
// valid headers
val result2 = FileBasedDataConnector(spark,
dcParam.copy(config = configs + (("options", Map(
"header" -> "true"
)
)))
, timestampStorage)
.data(1L)
assert(result2._1.isDefined)
assert(result2._1.get.collect().length == 1)
result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14", ConstantColumns.tmst)
}
// skip on erroneous paths
it should "respect options if an erroneous path is encountered" in {
val configs = Map(
"format" -> "csv",
"paths" -> Seq(
s"file://${getClass.getResource("/hive/person_table.csv").getPath}",
s"${java.util.UUID.randomUUID().toString}/"
),
"skipErrorPaths" -> true,
"options" -> Map(
"header" -> "true"
)
)
// valid paths
val result1 = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
assert(result1._1.isDefined)
assert(result1._1.get.collect().length == 1)
// non existent path
assertThrows[IllegalArgumentException](
FileBasedDataConnector(spark, dcParam.copy(config = configs - "skipErrorPaths"), timestampStorage).data(1L)
)
// no path
assertThrows[AssertionError](
FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"), timestampStorage).data(1L)
)
}
// Regarding various formats
it should "be able to read all supported file types" in {
val formats = Seq("parquet", "orc", "csv", "tsv")
formats.map(f => {
val configs = Map(
"format" -> f,
"paths" -> Seq(
s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
),
"options" -> Map(
"header" -> "true",
"inferSchema" -> "true"
)
)
val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
assert(result._1.isDefined)
val df = result._1.get
val expSchema = new StructType()
.add("name", StringType)
.add("age", IntegerType, nullable = true)
.add(ConstantColumns.tmst, LongType, nullable = false)
assert(df.collect().length == 2)
assert(df.schema == expSchema)
})
}
it should "apply schema to all formats if provided" in {
val formats = Seq("parquet", "orc", "csv", "tsv")
formats.map(f => {
val configs = Map(
"format" -> f,
"paths" -> Seq(
s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
),
"options" -> Map(
"header" -> "true"
),
"schema" -> Seq(Map("name" -> "name", "type" -> "string"))
)
val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
assert(result._1.isDefined)
val df = result._1.get
val expSchema = new StructType()
.add("name", StringType)
.add(ConstantColumns.tmst, LongType, nullable = false)
assert(df.collect().length == 2)
assert(df.schema == expSchema)
})
}
}