blob: f0a000c4af492d52b5d8b79e5df96b307c351bc6 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
import scala.collection.mutable.{Map => MutableMap}
import scala.util.{Success, Try}
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
import org.apache.griffin.measure.utils.ParamUtil._
/**
* A batch data connector for file based sources which allows support various
* file based data sources like Parquet, CSV, TSV, ORC etc.
* Local files can also be read by prepending `file://` namespace.
*
* Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
*
* Supported Configurations:
* - format : [[String]] specifying the type of file source (parquet, orc, etc.).
* - paths : [[Seq]] specifying the paths to be read
* - options : [[Map]] of format specific options
* - skipOnError : [[Boolean]] specifying where to continue execution if one or more paths are invalid.
* - schema : [[Seq]] of {colName, colType and isNullable} given as key value pairs. If provided, this can
* help skip the schema inference step for some underlying data sources.
*
* Some defaults assumed by this connector (if not set) are as follows:
* - `delimiter` is \t for TSV format,
* - `schema` is None,
* - `header` is false,
* - `format` is parquet
*/
case class FileBasedDataConnector(@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage)
extends BatchDataConnector {
import FileBasedDataConnector._
val config: Map[String, Any] = dcParam.getConfig
var options: MutableMap[String, String] = MutableMap(config.getParamStringMap(Options, Map.empty).toSeq: _*)
var format: String = config.getString(Format, DefaultFormat).toLowerCase
val paths: Seq[String] = config.getStringArr(Paths, Nil)
val schemaSeq: Seq[Map[String, String]] = config.getAnyRef[Seq[Map[String, String]]](Schema, Nil)
val skipErrorPaths: Boolean = config.getBoolean(SkipErrorPaths, defValue = false)
val currentSchema: Option[StructType] = Try(getUserDefinedSchema) match {
case Success(structType) if structType.fields.nonEmpty => Some(structType)
case _ => None
}
assert(SupportedFormats.contains(format),
s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
if (format == "csv") validateCSVOptions()
if (format == "tsv") {
format = "csv"
options.getOrElseUpdate(Delimiter, TabDelimiter)
}
/**
* Builds a [[StructType]] from the given schema string provided as `Schema` config.
*
* @example
* {"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]}
* {"schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]}
* {"schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]}
* @return
*/
private def getUserDefinedSchema: StructType = {
schemaSeq.foldLeft(new StructType())((currentStruct, fieldMap) => {
val colName = fieldMap(ColName).toLowerCase
val colType = fieldMap(ColType).toLowerCase
val isNullable = Try(fieldMap(IsNullable).toLowerCase.toBoolean).getOrElse(true)
currentStruct.add(colName, colType, isNullable)
})
}
/**
* Ensures the presence of schema either via `header` or `schema` options.
*
* - If both are present, the preference will be given to `schema`. First row will be omitted
* if `header` is set to true, else will be included.
* - If `schema` is defined, it must be valid.
* - If neither is set, a fatal exception is thrown.
*/
private def validateCSVOptions(): Unit = {
if (options.contains(Header) && config.contains(Schema)) {
griffinLogger.warn(s"Both $Options.$Header and $Schema were provided. Defaulting to provided $Schema")
}
if (!options.contains(Header) && !config.contains(Schema)) {
throw new IllegalArgumentException(s"Either '$Header' must be set in '$Options' or '$Schema' must be set.")
}
if (config.contains(Schema) && (schemaSeq.isEmpty || currentSchema.isEmpty)) {
throw new IllegalStateException("Unable to create schema from specification")
}
}
def data(ms: Long): (Option[DataFrame], TimeRange) = {
val validPaths = getValidPaths(paths, skipErrorPaths)
val dfOpt = {
val dfOpt = Some(
sparkSession.read
.options(options)
.format(format)
.withSchemaIfAny(currentSchema)
.load(validPaths: _*)
)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
}
(dfOpt, TimeRange(ms, readTmst(ms)))
}
}
object FileBasedDataConnector extends Loggable {
private val Format: String = "format"
private val Paths: String = "paths"
private val Options: String = "options"
private val SkipErrorPaths: String = "skipErrorPaths"
private val Schema: String = "schema"
private val Header: String = "header"
private val Delimiter: String = "delimiter"
private val ColName: String = "name"
private val ColType: String = "type"
private val IsNullable: String = "nullable"
private val TabDelimiter: String = "\t"
private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
/**
* Validates the existence of paths in a given sequence.
* Set option `skipOnError` to true to avoid fatal errors if any erroneous paths are encountered.
*
* @param paths given sequence of paths
* @param skipOnError flag to skip erroneous paths if any
* @return
*/
private def getValidPaths(paths: Seq[String], skipOnError: Boolean): Seq[String] = {
val validPaths = paths.filter(path =>
if (HdfsUtil.existPath(path)) true
else {
val msg = s"Path '$path' does not exist!"
if (skipOnError) griffinLogger.error(msg)
else throw new IllegalArgumentException(msg)
false
}
)
assert(validPaths.nonEmpty, "No paths were given for the data source.")
validPaths
}
/**
* Adds methods implicitly to [[DataFrameReader]]
*
* @param dfr an instance of [[DataFrameReader]]
*/
implicit class Implicits(dfr: DataFrameReader) {
/**
* Applies a schema to this [[DataFrameReader]] if any.
*
* @param schemaOpt an optional Schema
* @return
*/
def withSchemaIfAny(schemaOpt: Option[StructType]): DataFrameReader = {
schemaOpt match {
case Some(structType) => dfr.schema(structType)
case None => dfr
}
}
}
}