[GRIFFIN-297] Allow support for additional file based data sources
**What changes were proposed in this pull request?**
The PR extends the current support beyond just Avro and Text for various file based data sources (Parquet, ORC, etc).
- Allows users to specify additional file based data sources like Parquet, CSV, TSV, ORC etc.
- Allows data to be read directly from stand-alone files as well as directories present in both local/ distributed file systems.
- Allows users to specify schema directly through options (useful for CSV/ TSV types).
A sample config looks like,
```
{
"name": "source",
"baseline": true,
"connectors": [
{
"type": "file",
"version": "1.7",
"config": {
"format": "parquet",
"options": {
"k1": "v1",
"k2": "v2"
},
"paths": [
"/home/chitral/path/to/source/",
"/home/chitral/path/to/test.parquet"
]
}
}
]
}
```
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Griffin test suite. Some additional unit test has also been added.
Author: chitralverma <chitralverma@gmail.com>
Closes #555 from chitralverma/allow_file_based_batch_connectors.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index a1ef3ba..371fb7b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -35,6 +35,7 @@
val HiveRegex = """^(?i)hive$""".r
val AvroRegex = """^(?i)avro$""".r
+ val FileRegex = """^(?i)file$""".r
val TextDirRegex = """^(?i)text-dir$""".r
val KafkaRegex = """^(?i)kafka$""".r
@@ -62,6 +63,7 @@
conType match {
case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
+ case FileRegex() => FileBasedDataConnector(sparkSession, dcParam, tmstCache)
case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case KafkaRegex() =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
new file mode 100644
index 0000000..f0a000c
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -0,0 +1,209 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.griffin.measure.datasource.connector.batch
+
+import scala.collection.mutable.{Map => MutableMap}
+import scala.util.{Success, Try}
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * A batch data connector for file based sources which allows support various
+ * file based data sources like Parquet, CSV, TSV, ORC etc.
+ * Local files can also be read by prepending `file://` namespace.
+ *
+ * Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
+ *
+ * Supported Configurations:
+ * - format : [[String]] specifying the type of file source (parquet, orc, etc.).
+ * - paths : [[Seq]] specifying the paths to be read
+ * - options : [[Map]] of format specific options
+ * - skipOnError : [[Boolean]] specifying where to continue execution if one or more paths are invalid.
+ * - schema : [[Seq]] of {colName, colType and isNullable} given as key value pairs. If provided, this can
+ * help skip the schema inference step for some underlying data sources.
+ *
+ * Some defaults assumed by this connector (if not set) are as follows:
+ * - `delimiter` is \t for TSV format,
+ * - `schema` is None,
+ * - `header` is false,
+ * - `format` is parquet
+ */
+case class FileBasedDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage)
+ extends BatchDataConnector {
+
+ import FileBasedDataConnector._
+
+ val config: Map[String, Any] = dcParam.getConfig
+ var options: MutableMap[String, String] = MutableMap(config.getParamStringMap(Options, Map.empty).toSeq: _*)
+
+ var format: String = config.getString(Format, DefaultFormat).toLowerCase
+ val paths: Seq[String] = config.getStringArr(Paths, Nil)
+ val schemaSeq: Seq[Map[String, String]] = config.getAnyRef[Seq[Map[String, String]]](Schema, Nil)
+ val skipErrorPaths: Boolean = config.getBoolean(SkipErrorPaths, defValue = false)
+
+ val currentSchema: Option[StructType] = Try(getUserDefinedSchema) match {
+ case Success(structType) if structType.fields.nonEmpty => Some(structType)
+ case _ => None
+ }
+
+ assert(SupportedFormats.contains(format),
+ s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
+
+ if (format == "csv") validateCSVOptions()
+ if (format == "tsv") {
+ format = "csv"
+ options.getOrElseUpdate(Delimiter, TabDelimiter)
+ }
+
+ /**
+ * Builds a [[StructType]] from the given schema string provided as `Schema` config.
+ *
+ * @example
+ * {"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]}
+ * {"schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]}
+ * {"schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]}
+ * @return
+ */
+ private def getUserDefinedSchema: StructType = {
+ schemaSeq.foldLeft(new StructType())((currentStruct, fieldMap) => {
+ val colName = fieldMap(ColName).toLowerCase
+ val colType = fieldMap(ColType).toLowerCase
+ val isNullable = Try(fieldMap(IsNullable).toLowerCase.toBoolean).getOrElse(true)
+
+ currentStruct.add(colName, colType, isNullable)
+ })
+ }
+
+ /**
+ * Ensures the presence of schema either via `header` or `schema` options.
+ *
+ * - If both are present, the preference will be given to `schema`. First row will be omitted
+ * if `header` is set to true, else will be included.
+ * - If `schema` is defined, it must be valid.
+ * - If neither is set, a fatal exception is thrown.
+ */
+ private def validateCSVOptions(): Unit = {
+ if (options.contains(Header) && config.contains(Schema)) {
+ griffinLogger.warn(s"Both $Options.$Header and $Schema were provided. Defaulting to provided $Schema")
+ }
+
+ if (!options.contains(Header) && !config.contains(Schema)) {
+ throw new IllegalArgumentException(s"Either '$Header' must be set in '$Options' or '$Schema' must be set.")
+ }
+
+ if (config.contains(Schema) && (schemaSeq.isEmpty || currentSchema.isEmpty)) {
+ throw new IllegalStateException("Unable to create schema from specification")
+
+ }
+ }
+
+ def data(ms: Long): (Option[DataFrame], TimeRange) = {
+ val validPaths = getValidPaths(paths, skipErrorPaths)
+
+ val dfOpt = {
+ val dfOpt = Some(
+ sparkSession.read
+ .options(options)
+ .format(format)
+ .withSchemaIfAny(currentSchema)
+ .load(validPaths: _*)
+
+ )
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ }
+
+ (dfOpt, TimeRange(ms, readTmst(ms)))
+ }
+}
+
+object FileBasedDataConnector extends Loggable {
+ private val Format: String = "format"
+ private val Paths: String = "paths"
+ private val Options: String = "options"
+ private val SkipErrorPaths: String = "skipErrorPaths"
+ private val Schema: String = "schema"
+ private val Header: String = "header"
+ private val Delimiter: String = "delimiter"
+
+ private val ColName: String = "name"
+ private val ColType: String = "type"
+ private val IsNullable: String = "nullable"
+ private val TabDelimiter: String = "\t"
+
+ private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
+ private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
+
+ /**
+ * Validates the existence of paths in a given sequence.
+ * Set option `skipOnError` to true to avoid fatal errors if any erroneous paths are encountered.
+ *
+ * @param paths given sequence of paths
+ * @param skipOnError flag to skip erroneous paths if any
+ * @return
+ */
+ private def getValidPaths(paths: Seq[String], skipOnError: Boolean): Seq[String] = {
+ val validPaths = paths.filter(path =>
+ if (HdfsUtil.existPath(path)) true
+ else {
+ val msg = s"Path '$path' does not exist!"
+ if (skipOnError) griffinLogger.error(msg)
+ else throw new IllegalArgumentException(msg)
+
+ false
+ }
+ )
+
+ assert(validPaths.nonEmpty, "No paths were given for the data source.")
+ validPaths
+ }
+
+ /**
+ * Adds methods implicitly to [[DataFrameReader]]
+ *
+ * @param dfr an instance of [[DataFrameReader]]
+ */
+ implicit class Implicits(dfr: DataFrameReader) {
+
+ /**
+ * Applies a schema to this [[DataFrameReader]] if any.
+ *
+ * @param schemaOpt an optional Schema
+ * @return
+ */
+ def withSchemaIfAny(schemaOpt: Option[StructType]): DataFrameReader = {
+ schemaOpt match {
+ case Some(structType) => dfr.schema(structType)
+ case None => dfr
+ }
+ }
+ }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
new file mode 100644
index 0000000..bf881bb
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -0,0 +1,223 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
+import org.scalatest._
+
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createDataSets(s"file://${getClass.getResource("/").getPath}")
+ }
+
+ private def createDataSets(basePath: String): Unit = {
+ val formats = Seq("parquet", "orc", "csv", "tsv")
+ val schema = new StructType().add("name", StringType).add("age", IntegerType, nullable = true)
+
+ val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv")
+
+ df.cache()
+ formats.foreach(f => {
+ val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv")) "\t" else ""
+ df.write
+ .mode(SaveMode.Overwrite)
+ .option("delimiter", delimiter)
+ .option("header", "true")
+ .format(if (f.matches("tsv")) "csv" else f)
+ .save(s"${basePath}files/person_table.$f")
+ })
+
+ df.unpersist()
+ }
+
+ private final val dcParam = DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil)
+ private final val timestampStorage = TimestampStorage()
+
+ // Regarding Local FileSystem
+
+ "file based data connector" should "be able to read from local filesystem" in {
+ val configs = Map(
+ "format" -> "csv",
+ "paths" -> Seq(
+ s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+ ),
+ "options" -> Map(
+ "header" -> "false"
+ )
+ )
+
+ val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+ val result = dc.data(1000L)
+
+ assert(result._1.isDefined)
+ assert(result._1.get.collect().length == 2)
+ }
+
+ // Regarding User Defined Schema
+
+ it should "respect the provided schema, if any" in {
+ val configs = Map(
+ "format" -> "csv",
+ "paths" -> Seq(
+ s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+ )
+ )
+
+ // no schema
+ assertThrows[IllegalArgumentException](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+ )
+
+ // invalid schema
+ assertThrows[IllegalStateException](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs + (("schema", ""))), timestampStorage)
+ )
+
+ // valid schema
+ val result1 = FileBasedDataConnector(spark,
+ dcParam.copy(config = configs + (("schema",
+ Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age", "type" -> "int", "nullable" -> "true"))
+ )))
+ , timestampStorage)
+ .data(1L)
+
+ val expSchema = new StructType()
+ .add("name", StringType)
+ .add("age", IntegerType, nullable = true)
+ .add(ConstantColumns.tmst, LongType, nullable = false)
+
+ assert(result1._1.isDefined)
+ assert(result1._1.get.collect().length == 2)
+ assert(result1._1.get.schema == expSchema)
+
+ // valid headers
+ val result2 = FileBasedDataConnector(spark,
+ dcParam.copy(config = configs + (("options", Map(
+ "header" -> "true"
+ )
+ )))
+ , timestampStorage)
+ .data(1L)
+
+ assert(result2._1.isDefined)
+ assert(result2._1.get.collect().length == 1)
+ result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14", ConstantColumns.tmst)
+ }
+
+ // skip on erroneous paths
+
+ it should "respect options if an erroneous path is encountered" in {
+ val configs = Map(
+ "format" -> "csv",
+ "paths" -> Seq(
+ s"file://${getClass.getResource("/hive/person_table.csv").getPath}",
+ s"${java.util.UUID.randomUUID().toString}/"
+ ),
+ "skipErrorPaths" -> true,
+ "options" -> Map(
+ "header" -> "true"
+ )
+ )
+
+ // valid paths
+ val result1 = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
+
+ assert(result1._1.isDefined)
+ assert(result1._1.get.collect().length == 1)
+
+ // non existent path
+ assertThrows[IllegalArgumentException](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs - "skipErrorPaths"), timestampStorage).data(1L)
+ )
+
+ // no path
+ assertThrows[AssertionError](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"), timestampStorage).data(1L)
+ )
+ }
+
+ // Regarding various formats
+ it should "be able to read all supported file types" in {
+
+ val formats = Seq("parquet", "orc", "csv", "tsv")
+ formats.map(f => {
+ val configs = Map(
+ "format" -> f,
+ "paths" -> Seq(
+ s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+ ),
+ "options" -> Map(
+ "header" -> "true",
+ "inferSchema" -> "true"
+ )
+ )
+
+ val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
+
+ assert(result._1.isDefined)
+
+ val df = result._1.get
+ val expSchema = new StructType()
+ .add("name", StringType)
+ .add("age", IntegerType, nullable = true)
+ .add(ConstantColumns.tmst, LongType, nullable = false)
+
+ assert(df.collect().length == 2)
+ assert(df.schema == expSchema)
+ })
+ }
+
+ it should "apply schema to all formats if provided" in {
+ val formats = Seq("parquet", "orc", "csv", "tsv")
+ formats.map(f => {
+ val configs = Map(
+ "format" -> f,
+ "paths" -> Seq(
+ s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+ ),
+ "options" -> Map(
+ "header" -> "true"
+ ),
+ "schema" -> Seq(Map("name" -> "name", "type" -> "string"))
+ )
+
+ val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
+
+ assert(result._1.isDefined)
+
+ val df = result._1.get
+ val expSchema = new StructType()
+ .add("name", StringType)
+ .add(ConstantColumns.tmst, LongType, nullable = false)
+
+ assert(df.collect().length == 2)
+ assert(df.schema == expSchema)
+ })
+ }
+
+}