[GRIFFIN-297] Allow support for additional file based data sources

**What changes were proposed in this pull request?**

The PR extends the current support beyond just Avro and Text for various file based data sources (Parquet, ORC, etc).

 - Allows users to specify additional file based data sources like Parquet, CSV, TSV, ORC etc.
 - Allows data to be read directly from stand-alone files as well as directories present in both local/ distributed file systems.
 - Allows users to specify schema directly through options (useful for CSV/ TSV types).

A sample config looks like,

```
{
  "name": "source",
  "baseline": true,
  "connectors": [
    {
      "type": "file",
      "version": "1.7",
      "config": {
        "format": "parquet",
        "options": {
          "k1": "v1",
          "k2": "v2"
        },
        "paths": [
          "/home/chitral/path/to/source/",
          "/home/chitral/path/to/test.parquet"
        ]
      }
    }
  ]
}

```
**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Griffin test suite. Some additional unit test has also been added.

Author: chitralverma <chitralverma@gmail.com>

Closes #555 from chitralverma/allow_file_based_batch_connectors.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index a1ef3ba..371fb7b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -35,6 +35,7 @@
 
   val HiveRegex = """^(?i)hive$""".r
   val AvroRegex = """^(?i)avro$""".r
+  val FileRegex = """^(?i)file$""".r
   val TextDirRegex = """^(?i)text-dir$""".r
 
   val KafkaRegex = """^(?i)kafka$""".r
@@ -62,6 +63,7 @@
       conType match {
         case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
         case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
+        case FileRegex() => FileBasedDataConnector(sparkSession, dcParam, tmstCache)
         case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
         case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
         case KafkaRegex() =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
new file mode 100644
index 0000000..f0a000c
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -0,0 +1,209 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.griffin.measure.datasource.connector.batch
+
+import scala.collection.mutable.{Map => MutableMap}
+import scala.util.{Success, Try}
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * A batch data connector for file based sources which allows support various
+ * file based data sources like Parquet, CSV, TSV, ORC etc.
+ * Local files can also be read by prepending `file://` namespace.
+ *
+ * Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
+ *
+ * Supported Configurations:
+ *  - format : [[String]] specifying the type of file source (parquet, orc, etc.).
+ *  - paths : [[Seq]] specifying the paths to be read
+ *  - options : [[Map]] of format specific options
+ *  - skipOnError : [[Boolean]] specifying where to continue execution if one or more paths are invalid.
+ *  - schema : [[Seq]] of {colName, colType and isNullable} given as key value pairs. If provided, this can
+ * help skip the schema inference step for some underlying data sources.
+ *
+ * Some defaults assumed by this connector (if not set) are as follows:
+ *  - `delimiter` is \t for TSV format,
+ *  - `schema` is None,
+ *  - `header` is false,
+ *  - `format` is parquet
+ */
+case class FileBasedDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  timestampStorage: TimestampStorage)
+  extends BatchDataConnector {
+
+  import FileBasedDataConnector._
+
+  val config: Map[String, Any] = dcParam.getConfig
+  var options: MutableMap[String, String] = MutableMap(config.getParamStringMap(Options, Map.empty).toSeq: _*)
+
+  var format: String = config.getString(Format, DefaultFormat).toLowerCase
+  val paths: Seq[String] = config.getStringArr(Paths, Nil)
+  val schemaSeq: Seq[Map[String, String]] = config.getAnyRef[Seq[Map[String, String]]](Schema, Nil)
+  val skipErrorPaths: Boolean = config.getBoolean(SkipErrorPaths, defValue = false)
+
+  val currentSchema: Option[StructType] = Try(getUserDefinedSchema) match {
+    case Success(structType) if structType.fields.nonEmpty => Some(structType)
+    case _ => None
+  }
+
+  assert(SupportedFormats.contains(format),
+    s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
+
+  if (format == "csv") validateCSVOptions()
+  if (format == "tsv") {
+    format = "csv"
+    options.getOrElseUpdate(Delimiter, TabDelimiter)
+  }
+
+  /**
+   * Builds a [[StructType]] from the given schema string provided as `Schema` config.
+   *
+   * @example
+   * {"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]}
+   * {"schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]}
+   * {"schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]}
+   * @return
+   */
+  private def getUserDefinedSchema: StructType = {
+    schemaSeq.foldLeft(new StructType())((currentStruct, fieldMap) => {
+      val colName = fieldMap(ColName).toLowerCase
+      val colType = fieldMap(ColType).toLowerCase
+      val isNullable = Try(fieldMap(IsNullable).toLowerCase.toBoolean).getOrElse(true)
+
+      currentStruct.add(colName, colType, isNullable)
+    })
+  }
+
+  /**
+   * Ensures the presence of schema either via `header` or `schema` options.
+   *
+   *  - If both are present, the preference will be given to `schema`. First row will be omitted
+   * if `header` is set to true, else will be included.
+   *  - If `schema` is defined, it must be valid.
+   *  - If neither is set, a fatal exception is thrown.
+   */
+  private def validateCSVOptions(): Unit = {
+    if (options.contains(Header) && config.contains(Schema)) {
+      griffinLogger.warn(s"Both $Options.$Header and $Schema were provided. Defaulting to provided $Schema")
+    }
+
+    if (!options.contains(Header) && !config.contains(Schema)) {
+      throw new IllegalArgumentException(s"Either '$Header' must be set in '$Options' or '$Schema' must be set.")
+    }
+
+    if (config.contains(Schema) && (schemaSeq.isEmpty || currentSchema.isEmpty)) {
+      throw new IllegalStateException("Unable to create schema from specification")
+
+    }
+  }
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val validPaths = getValidPaths(paths, skipErrorPaths)
+
+    val dfOpt = {
+      val dfOpt = Some(
+        sparkSession.read
+          .options(options)
+          .format(format)
+          .withSchemaIfAny(currentSchema)
+          .load(validPaths: _*)
+
+      )
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    }
+
+    (dfOpt, TimeRange(ms, readTmst(ms)))
+  }
+}
+
+object FileBasedDataConnector extends Loggable {
+  private val Format: String = "format"
+  private val Paths: String = "paths"
+  private val Options: String = "options"
+  private val SkipErrorPaths: String = "skipErrorPaths"
+  private val Schema: String = "schema"
+  private val Header: String = "header"
+  private val Delimiter: String = "delimiter"
+
+  private val ColName: String = "name"
+  private val ColType: String = "type"
+  private val IsNullable: String = "nullable"
+  private val TabDelimiter: String = "\t"
+
+  private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
+  private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
+
+  /**
+   * Validates the existence of paths in a given sequence.
+   * Set option `skipOnError` to true to avoid fatal errors if any erroneous paths are encountered.
+   *
+   * @param paths       given sequence of paths
+   * @param skipOnError flag to skip erroneous paths if any
+   * @return
+   */
+  private def getValidPaths(paths: Seq[String], skipOnError: Boolean): Seq[String] = {
+    val validPaths = paths.filter(path =>
+      if (HdfsUtil.existPath(path)) true
+      else {
+        val msg = s"Path '$path' does not exist!"
+        if (skipOnError) griffinLogger.error(msg)
+        else throw new IllegalArgumentException(msg)
+
+        false
+      }
+    )
+
+    assert(validPaths.nonEmpty, "No paths were given for the data source.")
+    validPaths
+  }
+
+  /**
+   * Adds methods implicitly to [[DataFrameReader]]
+   *
+   * @param dfr an instance of [[DataFrameReader]]
+   */
+  implicit class Implicits(dfr: DataFrameReader) {
+
+    /**
+     * Applies a schema to this [[DataFrameReader]] if any.
+     *
+     * @param schemaOpt an optional Schema
+     * @return
+     */
+    def withSchemaIfAny(schemaOpt: Option[StructType]): DataFrameReader = {
+      schemaOpt match {
+        case Some(structType) => dfr.schema(structType)
+        case None => dfr
+      }
+    }
+  }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
new file mode 100644
index 0000000..bf881bb
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -0,0 +1,223 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
+import org.scalatest._
+
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    createDataSets(s"file://${getClass.getResource("/").getPath}")
+  }
+
+  private def createDataSets(basePath: String): Unit = {
+    val formats = Seq("parquet", "orc", "csv", "tsv")
+    val schema = new StructType().add("name", StringType).add("age", IntegerType, nullable = true)
+
+    val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv")
+
+    df.cache()
+    formats.foreach(f => {
+      val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv")) "\t" else ""
+      df.write
+        .mode(SaveMode.Overwrite)
+        .option("delimiter", delimiter)
+        .option("header", "true")
+        .format(if (f.matches("tsv")) "csv" else f)
+        .save(s"${basePath}files/person_table.$f")
+    })
+
+    df.unpersist()
+  }
+
+  private final val dcParam = DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil)
+  private final val timestampStorage = TimestampStorage()
+
+  // Regarding Local FileSystem
+
+  "file based data connector" should "be able to read from local filesystem" in {
+    val configs = Map(
+      "format" -> "csv",
+      "paths" -> Seq(
+        s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+      ),
+      "options" -> Map(
+        "header" -> "false"
+      )
+    )
+
+    val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    val result = dc.data(1000L)
+
+    assert(result._1.isDefined)
+    assert(result._1.get.collect().length == 2)
+  }
+
+  // Regarding User Defined Schema
+
+  it should "respect the provided schema, if any" in {
+    val configs = Map(
+      "format" -> "csv",
+      "paths" -> Seq(
+        s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+      )
+    )
+
+    // no schema
+    assertThrows[IllegalArgumentException](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage)
+    )
+
+    // invalid schema
+    assertThrows[IllegalStateException](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs + (("schema", ""))), timestampStorage)
+    )
+
+    // valid schema
+    val result1 = FileBasedDataConnector(spark,
+      dcParam.copy(config = configs + (("schema",
+        Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age", "type" -> "int", "nullable" -> "true"))
+      )))
+      , timestampStorage)
+      .data(1L)
+
+    val expSchema = new StructType()
+      .add("name", StringType)
+      .add("age", IntegerType, nullable = true)
+      .add(ConstantColumns.tmst, LongType, nullable = false)
+
+    assert(result1._1.isDefined)
+    assert(result1._1.get.collect().length == 2)
+    assert(result1._1.get.schema == expSchema)
+
+    // valid headers
+    val result2 = FileBasedDataConnector(spark,
+      dcParam.copy(config = configs + (("options", Map(
+        "header" -> "true"
+      )
+      )))
+      , timestampStorage)
+      .data(1L)
+
+    assert(result2._1.isDefined)
+    assert(result2._1.get.collect().length == 1)
+    result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14", ConstantColumns.tmst)
+  }
+
+  // skip on erroneous paths
+
+  it should "respect options if an erroneous path is encountered" in {
+    val configs = Map(
+      "format" -> "csv",
+      "paths" -> Seq(
+        s"file://${getClass.getResource("/hive/person_table.csv").getPath}",
+        s"${java.util.UUID.randomUUID().toString}/"
+      ),
+      "skipErrorPaths" -> true,
+      "options" -> Map(
+        "header" -> "true"
+      )
+    )
+
+    // valid paths
+    val result1 = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
+
+    assert(result1._1.isDefined)
+    assert(result1._1.get.collect().length == 1)
+
+    // non existent path
+    assertThrows[IllegalArgumentException](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs - "skipErrorPaths"), timestampStorage).data(1L)
+    )
+
+    // no path
+    assertThrows[AssertionError](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"), timestampStorage).data(1L)
+    )
+  }
+
+  // Regarding various formats
+  it should "be able to read all supported file types" in {
+
+    val formats = Seq("parquet", "orc", "csv", "tsv")
+    formats.map(f => {
+      val configs = Map(
+        "format" -> f,
+        "paths" -> Seq(
+          s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+        ),
+        "options" -> Map(
+          "header" -> "true",
+          "inferSchema" -> "true"
+        )
+      )
+
+      val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
+
+      assert(result._1.isDefined)
+
+      val df = result._1.get
+      val expSchema = new StructType()
+        .add("name", StringType)
+        .add("age", IntegerType, nullable = true)
+        .add(ConstantColumns.tmst, LongType, nullable = false)
+
+      assert(df.collect().length == 2)
+      assert(df.schema == expSchema)
+    })
+  }
+
+  it should "apply schema to all formats if provided" in {
+    val formats = Seq("parquet", "orc", "csv", "tsv")
+    formats.map(f => {
+      val configs = Map(
+        "format" -> f,
+        "paths" -> Seq(
+          s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+        ),
+        "options" -> Map(
+          "header" -> "true"
+        ),
+        "schema" -> Seq(Map("name" -> "name", "type" -> "string"))
+      )
+
+      val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L)
+
+      assert(result._1.isDefined)
+
+      val df = result._1.get
+      val expSchema = new StructType()
+        .add("name", StringType)
+        .add(ConstantColumns.tmst, LongType, nullable = false)
+
+      assert(df.collect().length == 2)
+      assert(df.schema == expSchema)
+    })
+  }
+
+}